Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI][Bugfix] Refine ci tests and revert many-to-many migration commit to avoid ci tests failure #74

Merged
merged 41 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8faeefc
[Bugfix] Address Request Status Changes During Migration Asynchronous…
KuilongCui Nov 18, 2024
2f9c264
update error log, kill bench process
KuilongCui Nov 21, 2024
9b37af8
remove hacker
KuilongCui Nov 21, 2024
9b2e765
fix
KuilongCui Nov 21, 2024
43df148
[CI] Fix and refine ci tests to avoid unexpected failure (#79)
s5u13b Nov 27, 2024
d9f0eb3
Fix lint
s5u13b Nov 27, 2024
a7a9f1a
Fix bakeup error_log
s5u13b Nov 27, 2024
1ec320d
Refine cleanup_ray_env
s5u13b Nov 27, 2024
9a101f1
More detailed pytest output
s5u13b Nov 27, 2024
12cc9d6
Temp try to fix e2e test
s5u13b Nov 27, 2024
fdceebc
Fix lint & Broader except
s5u13b Nov 27, 2024
f235202
Change subprocess.run check to False
s5u13b Nov 27, 2024
fdc898f
Minor
s5u13b Nov 27, 2024
5e80827
Change bash to sh in run test script
s5u13b Nov 27, 2024
fa6174a
Disable many-to-many migration temporarily
s5u13b Nov 27, 2024
d160224
Add pytest_runtest_makereport
s5u13b Nov 27, 2024
9db0381
Fix
s5u13b Nov 28, 2024
0d10de0
Refine subprocess.run and fixture
s5u13b Nov 28, 2024
d28afc0
Change subprocess.run check
s5u13b Nov 28, 2024
0e56a16
Fix
s5u13b Nov 28, 2024
9c5dc28
Refine -x
s5u13b Nov 28, 2024
37e976e
More strict migration test
s5u13b Nov 28, 2024
97535fa
Fix
s5u13b Nov 28, 2024
f0ba0e9
Minor
s5u13b Nov 29, 2024
967881f
Rename queue & Refine log
s5u13b Dec 5, 2024
1965489
Fix lint
s5u13b Dec 5, 2024
5c4af53
Fix
s5u13b Dec 5, 2024
0d91fd5
Change logger
s5u13b Dec 5, 2024
d59fae7
Add log for debugging ci
s5u13b Dec 5, 2024
b739dfb
Change request num from 1000 to 500 for ci debugging
s5u13b Dec 5, 2024
62266c8
Change from 500 to 300
s5u13b Dec 5, 2024
d615cf7
debugging ci
s5u13b Dec 5, 2024
491eb59
Fix
s5u13b Dec 5, 2024
c48af6c
Fix max-num-batched-tokens
s5u13b Dec 9, 2024
17e4c23
Fix exception handling of migration
s5u13b Dec 9, 2024
c17cec4
Revert "[Core] Support one-to-many and many-to-one migration (#63)"
s5u13b Dec 9, 2024
98517a7
Fix unit_test
s5u13b Dec 9, 2024
c81ea97
Fix e2e test
s5u13b Dec 9, 2024
109a7ef
Refine log
s5u13b Dec 9, 2024
77996ca
Fix lint
s5u13b Dec 9, 2024
3d388f9
Fix comments
s5u13b Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/offline_inference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Run offline inference example
run: |
nvidia-docker run --rm -t --net host --ipc host \
s5u13b marked this conversation as resolved.
Show resolved Hide resolved
-v ${PWD}:/workspace \
-w /workspace \
registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \
bash -c "pip install -e . > /dev/null && make offline_test"
run: ./tools/offline_test.sh
1 change: 1 addition & 0 deletions llumnix/backends/vllm/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Dict, List
import math
import torch
import ray

from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy, NodeAffinitySchedulingStrategy
from vllm.utils import is_pin_memory_available
Expand Down
15 changes: 8 additions & 7 deletions llumnix/llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,14 @@ async def migrate_done_callback(ret, migrate_instance_pair: Tuple[str, str]) ->
# TODO(s5u13b): Add more exception types for failover.
if isinstance(ret, (ray.exceptions.RayActorError, ray.exceptions.RayTaskError, KeyError)):
has_error_pair = await self._check_instance_error(migrate_instance_pair)
for i, has_error in enumerate(has_error_pair):
# Instance without error should clear migration states.
if not has_error:
try:
await self.instances[migrate_instance_pair[i]].clear_migration_states.remote(is_migrate_in=bool(i))
except (ray.exceptions.RayActorError, ray.exceptions.RayTaskError, KeyError):
has_error = True
# TODO(s5u13b): clear_migration_states by instance_id
# for i, has_error in enumerate(has_error_pair):
# # Instance without error should clear migration states.
# if not has_error:
# try:
# await self.instances[migrate_instance_pair[i]].clear_migration_states.remote(is_migrate_in=bool(i))
# except (ray.exceptions.RayActorError, ray.exceptions.RayTaskError, KeyError):
# has_error = True
for i, has_error in enumerate(has_error_pair):
if has_error:
instance_id = migrate_instance_pair[i]
Expand Down
8 changes: 8 additions & 0 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ async def migrate_out(self, dst_instance_name: str) -> List[str]:
migrate_out_requests = self.migration_scheduler.get_migrate_out_requests()
if len(migrate_out_requests) == 0:
return []

for migrate_out_request in migrate_out_requests:
migrate_out_request.is_migrating = True

migrated_request_list = []
for migrate_out_request in migrate_out_requests:
migrated_request = await self._migrate_out_one_request(migrate_out_request, dst_instance_name)
Expand All @@ -148,12 +152,16 @@ async def _migrate_out_one_request(self, migrate_out_request: LlumnixRequest, ds
dst_instance_id = dst_instance_name[len("instance_"):]
logger.info("{}->{} begin migrate out".format(self.instance_id, dst_instance_id))
migrated_request = []

if migrate_out_request.status == RequestStatus.RUNNING:
migrate_out_request.migration_start_time = time.time()
s5u13b marked this conversation as resolved.
Show resolved Hide resolved
status = await self.migration_coordinator.migrate_out_running_request(migrate_in_ray_actor, migrate_out_request)
elif migrate_out_request.status == RequestStatus.WAITING:
migrate_out_request.migration_start_time = time.time()
status = await self.migration_coordinator.migrate_out_waiting_request(migrate_in_ray_actor, migrate_out_request)
else:
return migrated_request

if status == MigrationStatus.FINISHED:
await migrate_in_ray_actor.execute_engine_method.remote("commit_dst_request", migrate_out_request)
self.backend_engine.free_src_request(migrate_out_request)
Expand Down
2 changes: 2 additions & 0 deletions llumnix/llumlet/local_migration_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _filter_running_queue(self, running, min_request_len, max_request_len):
if request.status == RequestStatus.RUNNING \
and request.inference_type == RequestInferenceType.DECODE \
and min_request_len < request.request_len < max_request_len \
and (not request.is_migrating) \
]
return filtered_running

Expand All @@ -67,6 +68,7 @@ def _filter_waiting_queue(self, waiting, min_request_len, max_request_len):
if request.status == RequestStatus.WAITING \
and request.try_schedule_times >= 1 \
and min_request_len < request.request_len < max_request_len \
and (not request.is_migrating) \
]
return filtered_waiting

Expand Down
6 changes: 6 additions & 0 deletions llumnix/llumlet/migration_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ async def _migrate_out_onestage(self,
migrate_out_request: LlumnixRequest) -> "MigrationStatus":
"""one-stage live migration until last stage for a running request
"""
if migrate_out_request.should_abort_migration():
return MigrationStatus.ABORTED_SRC

pre_stage_num_blocks = sum(migrate_out_request.stage_num_blocks_list)
incremental_blocks = self.backend_engine.get_request_incremental_blocks(migrate_out_request, pre_stage_num_blocks)
# live migration, transfer all blocks except last one(currently updating)
Expand Down Expand Up @@ -129,12 +132,15 @@ async def _migrate_out_onestage(self,
self.backend_engine.add_running_request(migrate_out_request)
self.backend_engine.remove_migrating_out_request_last_stage(migrate_out_request)
return MigrationStatus.ABORTED_DST
if migrate_out_request.should_abort_migration():
return MigrationStatus.ABORTED_SRC

# do stage send/recv
migrate_out_request.stage_timestamps.append(time.time())
migrate_out_request.stage_num_blocks_list.append(stage_block_num)
# TODO(ZeldaHuang): send_blocks in migrate_in_pre_alloc/migrate_in_last_stage
await self.backend_engine.send_blocks(migrate_in_ray_actor, src_blocks, dst_blocks)

if not is_last_stage and migrate_out_request.should_abort_migration():
# migrate-out request abort by scheduler during send/recv
return MigrationStatus.ABORTED_SRC
Expand Down
12 changes: 10 additions & 2 deletions llumnix/llumlet/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def __init__(self, request_id: int, server_info: ServerInfo, expected_steps: int
self.stage_num_blocks_list = []
self.try_schedule_times = 0
self._status = None
self.migration_start_time = None
self.is_migrating = False

# end-of-migration, for multiple requests migration
self.eom = False
Expand All @@ -53,11 +55,15 @@ def reset_migration_args_dst(self):
self.stage_timestamps = []
self.stage_num_blocks_list = []
self.try_schedule_times = 0
self.migration_start_time = None
self.is_migrating = False

def reset_migration_args_src(self):
self.last_preemption_time = None
self.stage_timestamps = []
self.stage_num_blocks_list = []
self.migration_start_time = None
self.is_migrating = False

def reset_status(self):
self._status = None
Expand Down Expand Up @@ -104,5 +110,7 @@ def blocking_migration(self) -> bool:
return self.output_len >= self.expected_steps

def should_abort_migration(self) -> bool:
return self.finished \
or (self.last_preemption_time is not None and self.last_preemption_time > self.stage_timestamps[-1])
begin_time = self.stage_timestamps[-1] if len(self.stage_timestamps) > 0 else self.migration_start_time
s5u13b marked this conversation as resolved.
Show resolved Hide resolved
preempted = self.last_preemption_time is not None and self.last_preemption_time > begin_time

return self.finished or preempted
50 changes: 33 additions & 17 deletions tests/e2e_test/test_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
import json
import os
import subprocess
import pytest
import torch
import numpy as np

from .test_e2e import generate_launch_command, clear_ray_state
# pylint: disable=unused-import
from .utils import to_markdown_table, setup_ray_env
from .utils import to_markdown_table

def launch_llumnix_service(command):
subprocess.run(command, shell=True, check=True)
Expand Down Expand Up @@ -91,35 +91,51 @@ def get_markdown_data(key: str, head_name: str):
@pytest.mark.asyncio
@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for simple benchmark")
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
async def test_simple_benchmark(setup_ray_env, model):
async def test_simple_benchmark(model):
device_count = torch.cuda.device_count()
base_port = 37037
for i in range(device_count):
launch_command = generate_launch_command(result_filename=str(base_port+i)+".out",
launch_ray_cluster=False, port=base_port+i, model=model)
subprocess.run(launch_command, shell=True, check=True)

await asyncio.sleep(60)
await asyncio.sleep(30)

async def run_bench_command(command):
process = await asyncio.create_subprocess_shell(command)
await process.wait()
assert process.returncode == 0
def run_bench_command(command):
s5u13b marked this conversation as resolved.
Show resolved Hide resolved
# pylint: disable=consider-using-with
process = subprocess.Popen(command, shell=True)
return process

tasks = []
for i in range(device_count):
bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300,
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=2,
results_filename=f"{base_port+i}.out")
tasks.append(run_bench_command(bench_command))

await asyncio.wait(tasks, timeout=60*30)
bench_command = generate_bench_command(
ip_ports=f"127.0.0.1:{base_port + i}",
model=model,
num_prompts=200,
KuilongCui marked this conversation as resolved.
Show resolved Hide resolved
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl",
qps=5,
results_filename=f"{base_port + i}.out"
)
tasks.append(bench_command)

with ThreadPoolExecutor() as executor:
s5u13b marked this conversation as resolved.
Show resolved Hide resolved
future_to_command = {executor.submit(run_bench_command, command): command for command in tasks}

for future in as_completed(future_to_command):
try:
process = future.result()
process.wait(timeout=60*30)
assert process.returncode == 0, "bench_test failed with return code {}.".format(process.returncode)
# pylint: disable=broad-except
except subprocess.TimeoutExpired:
process.kill()
print("bench_test timed out after 30 minutes.")

with open("performance.txt", "w", encoding="utf-8") as f:
f.write(parse_log_file())

# TODO(KuilongCui): change clear_state function to fixture
shutdown_llumnix_service()
clear_ray_state()
await asyncio.sleep(3)
4 changes: 1 addition & 3 deletions tests/e2e_test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import torch

from vllm import LLM, SamplingParams
# pylint: disable=unused-import
from .utils import setup_ray_env

def parse_launch_mode(launch_mode: str):
# 'eief' means that enable init instance by manager and enable fixed node init instance, and so on.
Expand Down Expand Up @@ -140,7 +138,7 @@ def run_vllm(model, max_model_len, sampling_params):
@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B'])
@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo'])
@pytest.mark.parametrize("launch_mode", ['eief', 'eidf', 'dief', 'didf'])
async def test_e2e(setup_ray_env, model, migration_backend, launch_mode):
async def test_e2e(model, migration_backend, launch_mode):
if migration_backend == 'gloo' and launch_mode != 'eief':
pytest.skip("When the migration backend is gloo, the launch mode of llumnix can only be eief")
max_model_len = 370
Expand Down
67 changes: 41 additions & 26 deletions tests/e2e_test/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
from collections import defaultdict
import re
import subprocess
import pytest
import torch
import pandas as pd

from .test_e2e import generate_launch_command
from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service
# pylint: disable=unused-import
from .utils import to_markdown_table, setup_ray_env
from .utils import to_markdown_table

size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)')
speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s')
Expand All @@ -49,7 +49,9 @@ def parse_instance_log_file(log_files):

speeds.sort()
trimmed_speeds = speeds[1:-1]
average_speed[transfer_size] = sum(trimmed_speeds) / len(trimmed_speeds)

if len(trimmed_speeds) > 0:
average_speed[transfer_size] = sum(trimmed_speeds) / len(trimmed_speeds)

assert len(average_speed) > 0, "Migration should have occurred, but it was not detected. "

Expand Down Expand Up @@ -86,31 +88,44 @@ async def test_migration_benchmark(model, migration_backend, migrated_request_st
log_instance_info=True,
request_migration_policy=request_migration_policy)
subprocess.run(launch_command, shell=True, check=True)
await asyncio.sleep(5)
await asyncio.sleep(30)

async def run_bench_command(command):
process = await asyncio.create_subprocess_shell(command)
await process.wait()
assert process.returncode == 0
def run_bench_command(command):
# pylint: disable=consider-using-with
process = subprocess.Popen(command, shell=True)
return process

tasks = []
for i in range(device_count//2):
bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300,
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" ,
qps=10,
results_filename=f"{base_port+i}.out")
tasks.append(asyncio.create_task(run_bench_command(bench_command)))

_, pending = await asyncio.wait(tasks, timeout=60*30)

await asyncio.sleep(10)

if len(pending) > 0:
raise RuntimeError("migration task Timeout")

parse_manager_log_file("manager_instance.csv")
for i in range(device_count // 2):
bench_command = generate_bench_command(
ip_ports=f"127.0.0.1:{base_port + i}",
model=model,
num_prompts=300,
dataset_type="sharegpt",
dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl",
qps=10,
results_filename=f"{base_port + i}.out"
)
tasks.append(bench_command)

# Execute the commands concurrently using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
future_to_command = {executor.submit(run_bench_command, command): command for command in tasks}

for future in as_completed(future_to_command):
try:
process = future.result()
process.wait(timeout=60*30)
assert process.returncode == 0, "migration_test failed with return code {}.".format(process.returncode)
# pylint: disable=broad-except
except subprocess.TimeoutExpired:
process.kill()
print("bench_test timed out after 30 minutes.")

await asyncio.sleep(5)

# TODO(s5u13b): use a more definitive way to determine that there is no memory leak.
# parse_manager_log_file("manager_instance.csv")

if migrated_request_status == 'running':
average_speed = parse_instance_log_file(instance_output_logs)
Expand All @@ -124,4 +139,4 @@ async def run_bench_command(command):

shutdown_llumnix_service()
clear_ray_state()
await asyncio.sleep(10)
await asyncio.sleep(3)
12 changes: 0 additions & 12 deletions tests/e2e_test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import subprocess
import pytest

def to_markdown_table(data):
headers = data[0]
rows = data[1:]
Expand All @@ -31,11 +27,3 @@ def to_markdown_table(data):

table = f"{header_row}\n{separator_row}\n" + "\n".join(data_rows) + "\n\n"
return table

@pytest.fixture
def setup_ray_env():
subprocess.run(["ray", "stop"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.run(["ray", "start", "--head", "--disable-usage-stats", "--port=6379"], check=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
yield
subprocess.run(["ray", "stop"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
4 changes: 4 additions & 0 deletions tests/unit_test/global_scheduler/test_llm_engine_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,7 @@ def test_update_instance_info_loop_and_migrate(setup_ray_env, engine_manager):
assert num_migrate_in == 0 and num_migrate_out > 1
else:
assert num_migrate_in == 0 and num_migrate_out == 0

@pytest.mark.skip("Not implemented yet")
def test_concurrent_migrate(setup_ray_env):
pass
2 changes: 2 additions & 0 deletions tools/bench_test.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/bin/bash
set -ex

pgrep -f llumnix.entrypoints.vllm.api_server | { while read pid; do kill -9 "$pid"; done; }

nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \
registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \
bash -c "pip install -e . > /dev/null && make bench_test"
Loading
Loading