diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml new file mode 100644 index 00000000..b69beab9 --- /dev/null +++ b/.github/workflows/bench.yml @@ -0,0 +1,44 @@ +name: BENCH + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + cancel_previous_workflows: + runs-on: [self-hosted] + timeout-minutes: 3 + steps: + - uses: styfle/cancel-workflow-action@0.12.1 + with: + all_but_latest: true + + bench_tests: + needs: cancel_previous_workflows + runs-on: [self-hosted] + timeout-minutes: 60 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Kill Running Containers + run: | + [[ -n $(docker ps -q) ]] && docker kill $(docker ps -q) || echo "No running containers to kill." + - name: Build And Test + run: ./tools/bench_test.sh + - name: Create comment from file + uses: actions/github-script@v7 + with: + script: | + const fs = require('fs'); + const filePath = 'performance.txt'; + const commentBody = fs.readFileSync(filePath, 'utf8'); + await github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: commentBody + }); diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml new file mode 100644 index 00000000..61f53a34 --- /dev/null +++ b/.github/workflows/e2e.yml @@ -0,0 +1,31 @@ +name: E2E + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + cancel_previous_workflows: + runs-on: [self-hosted] + timeout-minutes: 3 + steps: + - uses: styfle/cancel-workflow-action@0.12.1 + with: + all_but_latest: true + + e2e_tests: + needs: cancel_previous_workflows + runs-on: [self-hosted] + timeout-minutes: 60 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Kill Running Containers + run: | + [[ -n $(docker ps -q) ]] && docker kill $(docker ps -q) || echo "No running containers to kill." + - name: Build And Test + run: ./tools/e2e_test.sh diff --git a/.github/workflows/migration.yml b/.github/workflows/migration.yml new file mode 100644 index 00000000..a12794ed --- /dev/null +++ b/.github/workflows/migration.yml @@ -0,0 +1,44 @@ +name: MIGRAION + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + cancel_previous_workflows: + runs-on: [self-hosted] + timeout-minutes: 3 + steps: + - uses: styfle/cancel-workflow-action@0.12.1 + with: + all_but_latest: true + + migration_tests: + needs: cancel_previous_workflows + runs-on: [self-hosted] + timeout-minutes: 60 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Kill Running Containers + run: | + [[ -n $(docker ps -q) ]] && docker kill $(docker ps -q) || echo "No running containers to kill." + - name: Build And Test + run: ./tools/migration_test.sh + - name: Create comment from file + uses: actions/github-script@v7 + with: + script: | + const fs = require('fs'); + const filePath = 'performance.txt'; + const commentBody = fs.readFileSync(filePath, 'utf8'); + await github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: commentBody + }); \ No newline at end of file diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index cdb2aa7e..e3e0e22b 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -1,4 +1,4 @@ -name: Pylint +name: PYLINT on: push: @@ -9,21 +9,24 @@ on: - main jobs: - build: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10"] + cancel_previous_workflows: + runs-on: [self-hosted] + timeout-minutes: 3 steps: - - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + - uses: styfle/cancel-workflow-action@0.12.1 with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install pylint==2.12.2 + all_but_latest: true + + pylint_test: + needs: cancel_previous_workflows + runs-on: [self-hosted] + timeout-minutes: 10 + steps: + - uses: actions/checkout@v4 - name: Analysing the code with pylint run: | - pylint --rcfile=.pylintrc --output-format=parseable --jobs=8 $( find llumnix/ -type f -name '*.py') + nvidia-docker run --rm -t --net host --ipc host \ + -v ${PWD}:/workspace \ + -w /workspace \ + registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ + bash -c "pip install -e . > /dev/null && make lint" diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml new file mode 100644 index 00000000..93dad59c --- /dev/null +++ b/.github/workflows/unit.yml @@ -0,0 +1,31 @@ +name: UNIT + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + cancel_previous_workflows: + runs-on: [self-hosted] + timeout-minutes: 3 + steps: + - uses: styfle/cancel-workflow-action@0.12.1 + with: + all_but_latest: true + + unit_tests: + needs: cancel_previous_workflows + runs-on: [self-hosted] + timeout-minutes: 60 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Kill Running Containers + run: | + [[ -n $(docker ps -q) ]] && docker kill $(docker ps -q) || echo "No running containers to kill." + - name: Build And Test + run: ./tools/unit_test.sh diff --git a/.github/workflows/whl.yml b/.github/workflows/whl.yml new file mode 100644 index 00000000..c70b26db --- /dev/null +++ b/.github/workflows/whl.yml @@ -0,0 +1,26 @@ +name: WHL_BUILD + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + whl_build: + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Build whl + run: | + python3 -m pip install --upgrade setuptools wheel + python3 setup.py bdist_wheel --universal diff --git a/Makefile b/Makefile index 75255e12..b5bb2a21 100644 --- a/Makefile +++ b/Makefile @@ -21,15 +21,15 @@ install: .PHONY: lint lint: check_pylint_installed check_pytest_installed - @pylint --rcfile=.pylintrc -s n ./llumnix + @pylint --rcfile=.pylintrc -s n --jobs=32 ./llumnix @pylint --rcfile=.pylintrc \ --disable=protected-access,super-init-not-called,unused-argument,redefined-outer-name,invalid-name \ - -s n ./tests + -s n --jobs=32 ./tests .PHONY: test test: check_pytest_installed - @pytest -x -q --ignore=third_party/ --disable-warnings + @pytest -x -v --ignore=third_party/ --ignore=tests/e2e_test --disable-warnings #################### pygloo install for gloo migration backend begin #################### diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..56f33116 --- /dev/null +++ b/conftest.py @@ -0,0 +1,14 @@ +# conftest.py +import subprocess +from time import sleep + +def pytest_sessionstart(session): + subprocess.run(["ray", "stop", "--force"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + sleep(3) + subprocess.run(["ray", "start", "--head", "--disable-usage-stats", "--port=30050"], check=True, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + sleep(3) + +def pytest_sessionfinish(session, exitstatus): + subprocess.run(["ray", "stop"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + sleep(3) diff --git a/llumnix/backends/vllm/llm_engine.py b/llumnix/backends/vllm/llm_engine.py index 91406073..782aa820 100644 --- a/llumnix/backends/vllm/llm_engine.py +++ b/llumnix/backends/vllm/llm_engine.py @@ -166,6 +166,7 @@ def step(self) -> None: instance_info.step_id = next(self.step_counter) instance_info.timestamp = time.time() instance_info.latency = self.model_executor.last_inference_latency + seq_groups = self.scheduler.running if seq_groups: tot_blocks = [] @@ -257,8 +258,8 @@ def commit_dst_request(self, backend_request: SequenceGroupLlumnix) -> None: logger.info("add seq {} to block table".format(seq.seq_id)) pre_alloc_blocks = self.engine.scheduler.pre_alloc_cache_dict.pop(backend_request.request_id) self.engine.scheduler.block_manager.add_block_table(pre_alloc_blocks, seq.seq_id) - self.add_running_request(backend_request) backend_request.reset_migration_args() + self.add_running_request(backend_request) def send_blocks(self, dst_ray_actor: "ray.actor.ActorHandle", src_blocks: List[int], dst_blocks: List[int]) -> None: ray.get(dst_ray_actor.execute_engine_method.remote("_run_workers", diff --git a/llumnix/backends/vllm/worker.py b/llumnix/backends/vllm/worker.py index d581b2ce..91168d28 100644 --- a/llumnix/backends/vllm/worker.py +++ b/llumnix/backends/vllm/worker.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Dict, List import math import ray @@ -26,6 +27,7 @@ from llumnix.backends.vllm.utils import _sample_with_torch from llumnix.backends.vllm.migration_backend import MigrationBackendBase, get_migration_backend from llumnix.internal_config import MigrationConfig +from llumnix.utils import convert_bytes logger = init_logger(__name__) @@ -93,6 +95,7 @@ def init_migration(self, instance_id: str, migration_config: MigrationConfig, sr self.instance_id = instance_id self.global_world_size = 0 self.global_rank = -1 + # self.migration_config = migration_config self.migration_backend: MigrationBackendBase = get_migration_backend(migration_config=migration_config, cache_engine=self.cache_engine, worker_handle_list=src_worker_handle_list, @@ -104,10 +107,19 @@ def init_migration(self, instance_id: str, migration_config: MigrationConfig, sr def migrate_cache(self, src_worker_handle_list, src_blocks: List[int], dst_blocks: List[int]) -> None: src_worker_handle = src_worker_handle_list[self.rank] + + start_time = time.time() try: self.migration_backend.migrate_cache(src_worker_handle, src_blocks, dst_blocks) except ray.exceptions.RayActorError: logger.info("[migrate_cache] self.rank: {}, src_worker_handle {} is dead".format(self.rank, src_worker_handle)) + end_time = time.time() + + total_kv_cache_size = len(src_blocks) * CacheEngine.get_cache_block_size( + self.cache_config, self.model_config, self.parallel_config) + speed = total_kv_cache_size/1024/1024/1024/(end_time - start_time) + logger.info("[migration_cache] blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s." + .format(len(src_blocks), convert_bytes(total_kv_cache_size), end_time-start_time, speed)) def do_recv(self, *args, **kwargs): return self.migration_backend.do_recv(*args, **kwargs) diff --git a/llumnix/entrypoints/llumnix_utils.py b/llumnix/entrypoints/llumnix_utils.py index 0502f97f..52d2d10e 100644 --- a/llumnix/entrypoints/llumnix_utils.py +++ b/llumnix/entrypoints/llumnix_utils.py @@ -17,6 +17,7 @@ import time from typing import List, Tuple import asyncio +import socket import ray from llumnix.llm_engine_manager import LLMEngineManager, MANAGER_ACTOR_NAME @@ -38,10 +39,9 @@ MAX_TASK_RETRIES = 300 RETRIES_INTERVALS = 0.1 - def get_ip_address(): - result = subprocess.run(['hostname', '-i'], stdout=subprocess.PIPE, check=True) - ip_address = result.stdout.decode('utf-8').strip() + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) return ip_address def launch_ray_cluster(ray_cluster_port: int) -> subprocess.CompletedProcess: diff --git a/llumnix/llumlet/llumlet.py b/llumnix/llumlet/llumlet.py index ba576aec..5ad3963f 100644 --- a/llumnix/llumlet/llumlet.py +++ b/llumnix/llumlet/llumlet.py @@ -116,6 +116,7 @@ def migrate_out(self, dst_instance_name: str) -> List[str]: migrate_out_request.stage_timestamps.append(time.time()) self.backend_engine.remove_migrating_out_request_last_stage(migrate_out_request) else: + migrate_out_request.reset_migration_args() ray.get(migrate_in_ray_actor.execute_migration_method.remote("free_dst_pre_alloc_cache", migrate_out_request.request_id)) t1 = time.time() logger.info("{}->{} migrate done, migrate request {}, status:{}, len:{} blocks, cost:{} ms" \ diff --git a/llumnix/utils.py b/llumnix/utils.py index 06f4fdbb..f17ef6c2 100644 --- a/llumnix/utils.py +++ b/llumnix/utils.py @@ -16,3 +16,17 @@ def random_uuid() -> str: return str(uuid.uuid4().hex) + +def convert_bytes(bytes_size): + """Convert bytes to KB, MB, GB, etc.""" + if bytes_size < 0: + raise ValueError("Size must be a non-negative integer.") + + size_suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] + index = 0 + + while bytes_size >= 1024 and index < len(size_suffixes) - 1: + bytes_size /= 1024.0 + index += 1 + + return f"{bytes_size:.2f} {size_suffixes[index]}" diff --git a/pytest.ini b/pytest.ini index 6a7d1706..0102b0a9 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,2 @@ [pytest] -asyncio_default_fixture_loop_scope = function \ No newline at end of file +asyncio_default_fixture_loop_scope = function diff --git a/requirements.txt b/requirements.txt index 64697ad6..25cf07ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ func_timeout pyyaml yacs numpy < 1.24.0 # for gloo migration backend's compatibility with numpy.float +pyzmq diff --git a/tests/e2e_test/__init__.py b/tests/e2e_test/__init__.py new file mode 100644 index 00000000..f1ced327 --- /dev/null +++ b/tests/e2e_test/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO(KuilongCui): add failover test diff --git a/tests/e2e_test/test_bench.py b/tests/e2e_test/test_bench.py new file mode 100644 index 00000000..9bebbac9 --- /dev/null +++ b/tests/e2e_test/test_bench.py @@ -0,0 +1,136 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import json +import os +import subprocess +import pytest +import ray +import torch +import numpy as np + +from .test_e2e import generate_launch_command +from .utils import to_markdown_table + +def launch_llumnix_service(command): + subprocess.run(command, shell=True, check=True) + +def generate_bench_command(ip_ports: str, model: str, num_prompts: int, dataset_type: str, dataset_path: str, + qps: int, results_filename: str = "", query_distribution: str = "poisson", + coefficient_variation: float = 1.0, priority_ratio: float = 0.0): + command = ( + f"python -u ./benchmark/benchmark_serving.py " + f"--ip_ports {ip_ports} " + f"--backend vLLM " + f"--tokenizer {model} " + f"--trust_remote_code " + f"--log_filename bench_{ip_ports.split(':')[1]} " + f"--random_prompt_count {num_prompts} " + f"--dataset_type {dataset_type} " + f"--dataset_path {dataset_path} " + f"--qps {qps} " + f"--distribution {query_distribution} " + f"--coefficient_variation {coefficient_variation} " + f"--priority_ratio {priority_ratio} " + f"--log_latencies " + f"--fail_on_response_failure " + f"{'> bench_'+results_filename if len(results_filename)> 0 else ''}" + ) + return command + +def shutdown_llumnix_service(): + try: + subprocess.run('pkill -f llumnix.entrypoints.vllm.api_server', shell=True, check=True) + # pylint: disable=broad-except + except Exception: + pass + +def clear_ray_state(): + named_actors = ray.util.list_named_actors(True) + for actor in named_actors: + try: + actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace']) + # pylint: disable=bare-except + except: + continue + + try: + ray.kill(actor_handle) + # pylint: disable=bare-except + except: + continue + ray.shutdown() + +def parse_log_file(): + json_files = [f for f in os.listdir('.') if f.endswith('_latency_info.json')] + + decode_latencies = [] + + for json_file in json_files: + with open(json_file, 'r', encoding="utf-8") as file: + data = json.load(file)[0] + + decode_latencies.append(data.get('decode_latencies', [])) + + latencies_array = np.array(decode_latencies) + + p25 = np.percentile(latencies_array, 25) + p50 = np.percentile(latencies_array, 50) + p75 = np.percentile(latencies_array, 75) + p95 = np.percentile(latencies_array, 95) + p99 = np.percentile(latencies_array, 99) + mean = np.mean(latencies_array) + + data = [ + ["decode", "p25", "p50", "p75", "p95", "p99", "mean"], + ["latency(ms)", f"{p25:.2f}", f"{p50:.2f}", f"{p75:.2f}", f"{p95:.2f}", f"{p99:.2f}", f"{mean:.2f}"] + ] + + return to_markdown_table(data) + +@pytest.mark.asyncio +@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for simple benchmark") +@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) +async def test_simple_benchmark(model): + device_count = torch.cuda.device_count() + base_port = 37037 + for i in range(device_count): + launch_command = generate_launch_command(result_filename=str(base_port+i)+".out", + launch_ray_cluster=False, port=base_port+i, model=model) + subprocess.run(launch_command, shell=True, check=True) + + await asyncio.sleep(60) + + async def run_bench_command(command): + process = await asyncio.create_subprocess_shell(command) + await process.wait() + assert process.returncode == 0 + + tasks = [] + for i in range(device_count): + bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port+i}", model=model, num_prompts=300, + dataset_type="sharegpt", + dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" , + qps=30, + results_filename=f"{base_port+i}.out") + tasks.append(run_bench_command(bench_command)) + + await asyncio.wait(tasks, timeout=60*30) + + with open("performance.txt", "w", encoding="utf-8") as f: + f.write(parse_log_file()) + + shutdown_llumnix_service() + clear_ray_state() + await asyncio.sleep(3) diff --git a/tests/e2e_test/test_e2e.py b/tests/e2e_test/test_e2e.py new file mode 100644 index 00000000..c19d581c --- /dev/null +++ b/tests/e2e_test/test_e2e.py @@ -0,0 +1,142 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import asyncio +import pytest +import aiohttp +import ray +import torch + +from vllm import LLM, SamplingParams + +def generate_launch_command(result_filename: str = "", launch_ray_cluster: bool = True, HEAD_NODE_IP: str = "127.0.0.1", + ip: str = "127.0.0.1", port: int = 37000, instances_num = 1, dispatch_policy: str = "load", + migration_backend = "rpc", model = "facebook/opt-125m", max_model_len: int = 2048): + command = ( + f"RAY_DEDUP_LOGS=0 HEAD_NODE_IP={HEAD_NODE_IP} HEAD_NODE=1 " + f"nohup python -m llumnix.entrypoints.vllm.api_server " + f"--host {ip} " + f"--port {port} " + f"--initial-instances {instances_num} " + f"--enable-migration " + f"--model {model} " + f"--engine-use-ray " + f"--worker-use-ray " + f"--max-model-len {max_model_len} " + f"--dispatch-policy {dispatch_policy} " + f"--trust-remote-code " + f"--request-migration-policy LCFS " + f"--migration-backend {migration_backend} " + f"--migration-cache-blocks 32 " + f"--tensor-parallel-size 1 " + f"--request-output-queue-port {1234+port} " + f"{'--launch-ray-cluster ' if launch_ray_cluster else ''}" + f"{'> instance_'+result_filename if len(result_filename)> 0 else ''} 2>&1 &" + ) + return command + +def launch_llumnix_service(model: str, max_model_len: int, port: int, migration_backend: str): + command = generate_launch_command(model=model, max_model_len=max_model_len, + port=port, migration_backend=migration_backend) + subprocess.run(command, shell=True, check=True) + +def shutdown_llumnix_service(): + try: + subprocess.run('pkill -f llumnix.entrypoints.vllm.api_server', shell=True, check=True) + # pylint: disable=broad-except + except Exception: + pass + +def clear_ray_state(): + named_actors = ray.util.list_named_actors(True) + for actor in named_actors: + try: + actor_handle = ray.get_actor(actor['name'], namespace=actor['namespace']) + # pylint: disable=bare-except + except: + continue + + try: + ray.kill(actor_handle) + # pylint: disable=bare-except + except: + continue + ray.shutdown() + +async def get_llumnix_responce(prompt, sampling_params, ip_ports): + timeout = aiohttp.ClientTimeout(total=60) + + request = { + "prompt": prompt, + "stream": False, + **sampling_params, + } + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(f'http://{ip_ports}/generate', json=request) as resp: + output = await resp.json() + return output + +prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", +] + +@ray.remote(num_gpus=1) +def run_vllm(model, max_model_len, sampling_params): + vllm_output = {} + raw_vllm = LLM(model=model, trust_remote_code=True, max_model_len=max_model_len) + outputs = raw_vllm.generate(prompts, SamplingParams(**sampling_params)) + + for output in outputs: + vllm_output[output.prompt] = output.prompt + output.outputs[0].text + + return vllm_output + +@pytest.mark.asyncio +@pytest.mark.skipif(torch.cuda.device_count() < 1, reason="at least 1 gpus required for e2e test") +@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) +@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) +async def test_e2e(model, migration_backend): + max_model_len = 370 + sampling_params = { + "n": 1, + "best_of": 1, + "use_beam_search": False, + "temperature": 0.0, + "top_k": 1, + "ignore_eos": False, + } + + # generate llumnix outputs + base_port = 37037 + launch_llumnix_service(model, max_model_len, base_port, migration_backend) + await asyncio.sleep(60) + + llumnix_output = {} + for prompt in prompts: + response = await asyncio.wait_for(get_llumnix_responce(prompt, sampling_params, f"127.0.0.1:{base_port}"), + timeout=60*5) + llumnix_output[prompt] = response['text'][0] + + shutdown_llumnix_service() + + vllm_output = ray.get(run_vllm.remote(model, max_model_len, sampling_params)) + clear_ray_state() + + # compare + for prompt in prompts: + assert llumnix_output[prompt] == vllm_output[prompt] diff --git a/tests/e2e_test/test_migration.py b/tests/e2e_test/test_migration.py new file mode 100644 index 00000000..ef2c6da7 --- /dev/null +++ b/tests/e2e_test/test_migration.py @@ -0,0 +1,95 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from collections import defaultdict +import re +import subprocess +import pytest +import torch + +from .test_e2e import generate_launch_command +from .test_bench import generate_bench_command, clear_ray_state, shutdown_llumnix_service +from .utils import to_markdown_table + +size_pattern = re.compile(r'total_kv_cache_size:\s*([\d.]+)\s*(B|KB|MB|GB|KB|TB)') +speed_pattern = re.compile(r'speed:\s*([\d.]+)GB/s') + +def parse_log_file(log_files): + speed_dict = defaultdict(list) + + for log_file in log_files: + with open(log_file, 'r', encoding="utf-8") as file: + for line in file: + size_match = size_pattern.search(line) + speed_match = speed_pattern.search(line) + + if size_match and speed_match: + total_kv_cache_size = size_match.group(0).split(": ")[1].strip() + speed = float(speed_match.group(1)) + speed_dict[total_kv_cache_size].append(speed) + + averger_speed = {} + for transfer_size, speeds in speed_dict.items(): + if len(speeds) <= 2: + continue + + speeds.sort() + trimmed_speeds = speeds[1:-1] + averger_speed[transfer_size] = sum(trimmed_speeds) / len(trimmed_speeds) + + assert len(averger_speed) > 0, "Migration should have occurred, but it was not detected. " + + return averger_speed + +@pytest.mark.asyncio +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="at least 2 gpus required for migration bench") +@pytest.mark.parametrize("model", ['/mnt/model/Qwen-7B']) +@pytest.mark.parametrize("migration_backend", ['rpc', 'gloo', 'nccl']) +async def test_migration_benchmark(model, migration_backend): + base_port = 37037 + instance_output_logs = [] + + device_count = 2 + for i in range(device_count): + output_log = f"{base_port+i}.out" + instance_output_logs.append("instance_"+output_log) + launch_command = generate_launch_command(result_filename=output_log, launch_ray_cluster=False, port=base_port+i, + model=model, dispatch_policy="flood", migration_backend=migration_backend) + subprocess.run(launch_command, shell=True, check=True) + await asyncio.sleep(60) + + async def run_bench_command(command): + process = await asyncio.create_subprocess_shell(command) + await process.wait() + assert process.returncode == 0 + + bench_command = generate_bench_command(ip_ports=f"127.0.0.1:{base_port}", model=model, num_prompts=300, + dataset_type="sharegpt", + dataset_path="/mnt/dataset/sharegpt_gpt4/sharegpt_gpt4.jsonl" , + qps=30) + await asyncio.wait_for(run_bench_command(bench_command), timeout=60*30) + + averger_speed = parse_log_file(instance_output_logs) + + data = [ + ['migration_size'] + list(averger_speed.keys()), + [f'{migration_backend}_speed(GB/s)'] + [f"{value:.2f}" for value in averger_speed.values()] + ] + + with open("performance.txt", "a", encoding="utf-8") as f: + f.write(to_markdown_table(data)) + + shutdown_llumnix_service() + clear_ray_state() + await asyncio.sleep(3) diff --git a/tests/e2e_test/utils.py b/tests/e2e_test/utils.py new file mode 100644 index 00000000..62d9bff8 --- /dev/null +++ b/tests/e2e_test/utils.py @@ -0,0 +1,29 @@ +# Copyright (c) 2024, Alibaba Group; +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def to_markdown_table(data): + headers = data[0] + rows = data[1:] + + col_widths = [max(len(str(item)) for item in col) for col in zip(*data)] + + header_row = " | ".join(f"{str(item):<{col_widths[i]}}" for i, item in enumerate(headers)) + separator_row = " | ".join('-' * col_widths[i] for i in range(len(headers))) + + data_rows = [] + for row in rows: + data_row = " | ".join(f"{str(item):<{col_widths[i]}}" for i, item in enumerate(row)) + data_rows.append(data_row) + + table = f"{header_row}\n{separator_row}\n" + "\n".join(data_rows) + "\n\n" + return table diff --git a/tests/__init__.py b/tests/unit_test/__init__.py similarity index 100% rename from tests/__init__.py rename to tests/unit_test/__init__.py diff --git a/tests/backends/__init__.py b/tests/unit_test/backends/__init__.py similarity index 100% rename from tests/backends/__init__.py rename to tests/unit_test/backends/__init__.py diff --git a/tests/backends/vllm/__init__.py b/tests/unit_test/backends/vllm/__init__.py similarity index 100% rename from tests/backends/vllm/__init__.py rename to tests/unit_test/backends/vllm/__init__.py diff --git a/tests/backends/vllm/test_llm_engine.py b/tests/unit_test/backends/vllm/test_llm_engine.py similarity index 100% rename from tests/backends/vllm/test_llm_engine.py rename to tests/unit_test/backends/vllm/test_llm_engine.py diff --git a/tests/backends/vllm/test_migration.py b/tests/unit_test/backends/vllm/test_migration.py similarity index 97% rename from tests/backends/vllm/test_migration.py rename to tests/unit_test/backends/vllm/test_migration.py index 4e22c166..dc582088 100644 --- a/tests/backends/vllm/test_migration.py +++ b/tests/unit_test/backends/vllm/test_migration.py @@ -24,9 +24,10 @@ from llumnix.internal_config import MigrationConfig from llumnix.llumlet.request import LlumnixRequest, RequestInferenceType -from tests.utils import setup_ray_env -from tests.rpc.test_queue import request_output_queue_server - +# pylint: disable=unused-import +from tests.unit_test.rpc.test_queue import request_output_queue_server +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env from .test_llm_engine import MockEngine from .utils import create_dummy_prompt diff --git a/tests/backends/vllm/test_migration_backend.py b/tests/unit_test/backends/vllm/test_migration_backend.py similarity index 93% rename from tests/backends/vllm/test_migration_backend.py rename to tests/unit_test/backends/vllm/test_migration_backend.py index 8169d15e..34f90266 100644 --- a/tests/backends/vllm/test_migration_backend.py +++ b/tests/unit_test/backends/vllm/test_migration_backend.py @@ -22,9 +22,9 @@ from llumnix.arg_utils import EngineManagerArgs from llumnix.utils import random_uuid -from tests.backends.vllm.test_worker import create_worker -from tests.utils import setup_ray_env - +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env +from .test_worker import create_worker class MockMigrationWorker(MigrationWorker): def set_gpu_cache(self, data): @@ -44,10 +44,10 @@ def test_migrate_cache(setup_ray_env, backend): migraiton_config.migration_backend = backend worker0 = create_worker(rank=0, local_rank=0, engine_config=engine_config, - worker_module_name="tests.backends.vllm.test_migration_backend", + worker_module_name="tests.unit_test.backends.vllm.test_migration_backend", worker_class_name="MockMigrationWorker") worker1 = create_worker(rank=0, local_rank=0, engine_config=engine_config, - worker_module_name="tests.backends.vllm.test_migration_backend", + worker_module_name="tests.unit_test.backends.vllm.test_migration_backend", worker_class_name="MockMigrationWorker") ray.get(worker0.execute_method.remote('init_device')) diff --git a/tests/backends/vllm/test_scheduler.py b/tests/unit_test/backends/vllm/test_scheduler.py similarity index 100% rename from tests/backends/vllm/test_scheduler.py rename to tests/unit_test/backends/vllm/test_scheduler.py diff --git a/tests/backends/vllm/test_worker.py b/tests/unit_test/backends/vllm/test_worker.py similarity index 94% rename from tests/backends/vllm/test_worker.py rename to tests/unit_test/backends/vllm/test_worker.py index fc3f14c9..a48568ec 100644 --- a/tests/backends/vllm/test_worker.py +++ b/tests/unit_test/backends/vllm/test_worker.py @@ -14,7 +14,6 @@ import pytest import torch import ray -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from vllm.engine.arg_utils import EngineArgs from vllm.utils import get_distributed_init_method, get_ip, get_open_port @@ -25,20 +24,15 @@ from llumnix.arg_utils import EngineManagerArgs from llumnix.utils import random_uuid -from tests.utils import setup_ray_env +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env def create_worker(rank: int, local_rank: int, engine_config: EngineConfig, worker_module_name="llumnix.backends.vllm.worker", worker_class_name="MigrationWorker"): - scheduling_strategy = NodeAffinitySchedulingStrategy( - node_id=ray.get_runtime_context().get_node_id(), - soft=False, - ) - worker = ray.remote( num_cpus=0, - num_gpus=1, - scheduling_strategy=scheduling_strategy + num_gpus=1 )(RayWorkerWrapper).remote( worker_module_name=worker_module_name, worker_class_name=worker_class_name, diff --git a/tests/backends/vllm/utils.py b/tests/unit_test/backends/vllm/utils.py similarity index 100% rename from tests/backends/vllm/utils.py rename to tests/unit_test/backends/vllm/utils.py diff --git a/tests/entrypoints/__init__.py b/tests/unit_test/entrypoints/__init__.py similarity index 100% rename from tests/entrypoints/__init__.py rename to tests/unit_test/entrypoints/__init__.py diff --git a/tests/entrypoints/test_llumnix_utils.py b/tests/unit_test/entrypoints/test_llumnix_utils.py similarity index 94% rename from tests/entrypoints/test_llumnix_utils.py rename to tests/unit_test/entrypoints/test_llumnix_utils.py index b71966b0..d3b52f56 100644 --- a/tests/entrypoints/test_llumnix_utils.py +++ b/tests/unit_test/entrypoints/test_llumnix_utils.py @@ -24,8 +24,10 @@ retry_manager_method_async) from llumnix.llm_engine_manager import MANAGER_ACTOR_NAME -from tests.utils import setup_ray_env -from tests.rpc.test_queue import init_server_info +from tests.unit_test.rpc.test_queue import init_server_info + +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env def test_launch_ray_cluster(): diff --git a/tests/entrypoints/vllm/__init__.py b/tests/unit_test/entrypoints/vllm/__init__.py similarity index 100% rename from tests/entrypoints/vllm/__init__.py rename to tests/unit_test/entrypoints/vllm/__init__.py diff --git a/tests/entrypoints/vllm/api_server_manager.py b/tests/unit_test/entrypoints/vllm/api_server_manager.py similarity index 98% rename from tests/entrypoints/vllm/api_server_manager.py rename to tests/unit_test/entrypoints/vllm/api_server_manager.py index 3bb7795c..88084e2e 100644 --- a/tests/entrypoints/vllm/api_server_manager.py +++ b/tests/unit_test/entrypoints/vllm/api_server_manager.py @@ -24,7 +24,7 @@ from llumnix.rpc.queue_client import QueueClient from llumnix.entrypoints.llumnix_utils import init_request_output_queue -from tests.rpc.test_queue import init_server_info +from tests.unit_test.rpc.test_queue import init_server_info app = llumnix.entrypoints.vllm.api_server.app diff --git a/tests/entrypoints/vllm/test_api_server.py b/tests/unit_test/entrypoints/vllm/test_api_server.py similarity index 97% rename from tests/entrypoints/vllm/test_api_server.py rename to tests/unit_test/entrypoints/vllm/test_api_server.py index 6794d952..e8c81ba9 100644 --- a/tests/entrypoints/vllm/test_api_server.py +++ b/tests/unit_test/entrypoints/vllm/test_api_server.py @@ -19,7 +19,8 @@ import pytest import requests -from tests.utils import setup_ray_env +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env def _query_server(prompt: str, max_tokens: int = 5, interface: str = 'generate') -> dict: @@ -59,6 +60,7 @@ def api_server(): # Waiting for api server subprocess to terminate. time.sleep(1.0) +# @pytest.mark.skip(reason="This test is flaky") @pytest.mark.parametrize("interface", ['generate', 'generate_benchmark']) def test_api_server(setup_ray_env, api_server, interface: str): """ diff --git a/tests/global_scheduler/__init__.py b/tests/unit_test/global_scheduler/__init__.py similarity index 100% rename from tests/global_scheduler/__init__.py rename to tests/unit_test/global_scheduler/__init__.py diff --git a/tests/global_scheduler/test_dispatch_scheduler.py b/tests/unit_test/global_scheduler/test_dispatch_scheduler.py similarity index 100% rename from tests/global_scheduler/test_dispatch_scheduler.py rename to tests/unit_test/global_scheduler/test_dispatch_scheduler.py diff --git a/tests/global_scheduler/test_global_scheduler.py b/tests/unit_test/global_scheduler/test_global_scheduler.py similarity index 96% rename from tests/global_scheduler/test_global_scheduler.py rename to tests/unit_test/global_scheduler/test_global_scheduler.py index 89e3d63c..af8e7dd7 100644 --- a/tests/global_scheduler/test_global_scheduler.py +++ b/tests/unit_test/global_scheduler/test_global_scheduler.py @@ -19,7 +19,7 @@ from llumnix.global_scheduler.global_scheduler import GlobalScheduler from llumnix.instance_info import InstanceInfo -from tests.global_scheduler.test_llm_engine_manager import get_instance_info_migrate_in, get_instance_info_migrate_out +from .test_llm_engine_manager import get_instance_info_migrate_in, get_instance_info_migrate_out def init_global_scheduler(): diff --git a/tests/global_scheduler/test_llm_engine_manager.py b/tests/unit_test/global_scheduler/test_llm_engine_manager.py similarity index 99% rename from tests/global_scheduler/test_llm_engine_manager.py rename to tests/unit_test/global_scheduler/test_llm_engine_manager.py index 527c71ac..6a17937d 100644 --- a/tests/global_scheduler/test_llm_engine_manager.py +++ b/tests/unit_test/global_scheduler/test_llm_engine_manager.py @@ -24,7 +24,8 @@ from llumnix.llm_engine_manager import LLMEngineManager, MANAGER_ACTOR_NAME from llumnix.instance_info import InstanceInfo -from tests.utils import setup_ray_env +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env @ray.remote(num_cpus=1, max_concurrency=4) diff --git a/tests/global_scheduler/test_migration_scheduler.py b/tests/unit_test/global_scheduler/test_migration_scheduler.py similarity index 100% rename from tests/global_scheduler/test_migration_scheduler.py rename to tests/unit_test/global_scheduler/test_migration_scheduler.py diff --git a/tests/llumlet/__init__.py b/tests/unit_test/llumlet/__init__.py similarity index 100% rename from tests/llumlet/__init__.py rename to tests/unit_test/llumlet/__init__.py diff --git a/tests/llumlet/test_local_migration_scheduler.py b/tests/unit_test/llumlet/test_local_migration_scheduler.py similarity index 100% rename from tests/llumlet/test_local_migration_scheduler.py rename to tests/unit_test/llumlet/test_local_migration_scheduler.py diff --git a/tests/llumlet/test_migration_coordinator.py b/tests/unit_test/llumlet/test_migration_coordinator.py similarity index 98% rename from tests/llumlet/test_migration_coordinator.py rename to tests/unit_test/llumlet/test_migration_coordinator.py index 86430894..87905686 100644 --- a/tests/llumlet/test_migration_coordinator.py +++ b/tests/unit_test/llumlet/test_migration_coordinator.py @@ -19,7 +19,8 @@ from llumnix.backends.backend_interface import BackendInterface from llumnix.llumlet.llumlet import MigrationStatus -from tests.utils import setup_ray_env +# pylint: disable=unused-import +from tests.unit_test.utils import setup_ray_env from .test_local_migration_scheduler import MockRequest diff --git a/tests/rpc/__init__.py b/tests/unit_test/rpc/__init__.py similarity index 100% rename from tests/rpc/__init__.py rename to tests/unit_test/rpc/__init__.py diff --git a/tests/rpc/test_queue.py b/tests/unit_test/rpc/test_queue.py similarity index 96% rename from tests/rpc/test_queue.py rename to tests/unit_test/rpc/test_queue.py index ed12cc6a..3afd6339 100644 --- a/tests/rpc/test_queue.py +++ b/tests/unit_test/rpc/test_queue.py @@ -22,17 +22,16 @@ from llumnix.rpc.queue_client import QueueClient from llumnix.rpc.utils import get_open_zmq_ipc_path from llumnix.utils import random_uuid -from llumnix.entrypoints.llumnix_utils import get_ip_address from llumnix.server_info import ServerInfo from llumnix.entrypoints.llumnix_utils import init_request_output_queue # pylint: disable=W0611 -from tests.utils import setup_ray_env +from tests.unit_test.utils import setup_ray_env def init_server_info(): server_id = random_uuid() - ip = get_ip_address() + ip = '127.0.0.1' port = 1234 server_info = ServerInfo(server_id, ip, port) return server_info @@ -125,6 +124,6 @@ async def benchmark_queue(qps, ip=None, port=None): @pytest.mark.asyncio @pytest.mark.parametrize("qps", [128.0, 256.0, 512.0, 1024.0]) async def test_queue_zeromq(setup_ray_env, qps): - ip = get_ip_address() + ip = '127.0.0.1' port = 1234 await benchmark_queue(qps, ip, port) diff --git a/tests/utils.py b/tests/unit_test/utils.py similarity index 99% rename from tests/utils.py rename to tests/unit_test/utils.py index 352b3919..d49f5fd6 100644 --- a/tests/utils.py +++ b/tests/unit_test/utils.py @@ -14,7 +14,6 @@ import ray import pytest - @pytest.fixture def setup_ray_env(): ray.init(namespace="llumnix", ignore_reinit_error=True) diff --git a/tools/bench_test.sh b/tools/bench_test.sh new file mode 100755 index 00000000..2ef67d68 --- /dev/null +++ b/tools/bench_test.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -ex + +nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \ + registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ + bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_bench.py" diff --git a/tools/e2e_test.sh b/tools/e2e_test.sh new file mode 100755 index 00000000..b0364465 --- /dev/null +++ b/tools/e2e_test.sh @@ -0,0 +1,6 @@ +# #!/bin/bash +set -ex + +nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \ + registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ + bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_e2e.py" diff --git a/tools/image_build.sh b/tools/image_build.sh new file mode 100755 index 00000000..cd858c3e --- /dev/null +++ b/tools/image_build.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +NAMESPACE="$1" +REPO="registry.cn-beijing.aliyuncs.com/llumnix/${NAMESPACE}" + +DATE=$(date +%Y%m%d) +BRANCH=$(git rev-parse --abbrev-ref HEAD) +COMMIT_ID=$(git rev-parse --short=7 HEAD) +TAG="${DATE}_${BRANCH}_${COMMIT_ID}" + +# Get the Git user email +USER_EMAIL=$(git config user.email) + +if [ -z "$USER_EMAIL" ] || [ "${#USER_EMAIL}" -le 0 ]; then + echo "Error: Git user email is not set or empty. Please set it using 'git config user.email'" + exit 1 +fi + +# Ask for the container ID +echo "Please enter the container ID or name you want to commit:" +read CONTAINER_ID + +if ! docker inspect -f '{{.ID}}' "$CONTAINER_ID" &> /dev/null; then + echo "Error: The container '$CONTAINER_ID' does not exist." + exit 1 +fi + +# Display the details about the commit +echo "Preparing to commit the following container:" +echo "Container ID: $CONTAINER_ID" +echo "Image TAG: ${REPO}:${TAG}" +echo "Using Git user email: $USER_EMAIL" + +# Confirm the commit action +read -p "Do you want to proceed with the commit? (y/n): " -n 1 -r +echo # move to a new line +if [[ $REPLY != "y" ]]; then + echo "Commit aborted." + exit 1 +fi + +# Commit the container with an optional message and author +docker commit -a "${USER_EMAIL}" "${CONTAINER_ID}" "${REPO}:${TAG}" + +if [ $? -eq 0 ]; then + echo "Image committed successfully: ${REPO}:${TAG}" +else + echo "Image commit failed!" + exit 1 +fi diff --git a/tools/migration_test.sh b/tools/migration_test.sh new file mode 100755 index 00000000..244ac3cd --- /dev/null +++ b/tools/migration_test.sh @@ -0,0 +1,6 @@ +# #!/bin/bash +set -ex + +nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \ + registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ + bash -c "pip install -e . > /dev/null && pytest -v ./tests/e2e_test/test_migration.py" diff --git a/tools/unit_test.sh b/tools/unit_test.sh new file mode 100755 index 00000000..3a20a309 --- /dev/null +++ b/tools/unit_test.sh @@ -0,0 +1,6 @@ +# #!/bin/bash +set -ex + +nvidia-docker run --rm -t --net host --ipc host -v ${PWD}:/workspace -v /mnt:/mnt -w /workspace \ + registry.cn-beijing.aliyuncs.com/llumnix/llumnix-dev:20240909_action_678a439 \ + bash -c "pip install -e . > /dev/null && make test"