Skip to content

Commit

Permalink
[WIP] adata tp bladellm
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyi-ECNU authored and KuilongCui committed Dec 16, 2024
1 parent 4029b00 commit ad6bcce
Show file tree
Hide file tree
Showing 65 changed files with 4,919 additions and 176 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Proto files
*_pb2.py
*_pb2_grpc.py

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ lint: check_pylint_installed check_pytest_installed
--disable=protected-access,super-init-not-called,unused-argument,redefined-outer-name,invalid-name \
-s n --jobs=128 ./tests

.PHONY: clean
clean: proto-clean

###################################### proto begin ######################################

.PHONY: proto
proto:
@find . -type d -name "proto" | while read dir; do \
dir_base=$$(dirname $$dir); \
find $$dir -name "*.proto" | while read proto_file; do \
echo "Compiling $$proto_file"; \
PYTHONWARNINGS="ignore::DeprecationWarning" python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. $$proto_file; \
done; \
done;

.PHONY: proto-clean
proto-clean:
@find . -name "*_pb2_grpc.py" | xargs rm -f
@find . -name "*_pb2.py" | xargs rm -f

####################################### proto end #######################################

###################################### test begin #######################################

.PHONY: test
test: check_pytest_installed
@pytest -v --ignore=third_party/ --ignore=tests/e2e_test --disable-warnings
Expand Down Expand Up @@ -55,6 +79,8 @@ bench_test:
migration_test:
@pytest -v -x -s --tb=long ./tests/e2e_test/test_migration.py

####################################### test end ########################################

#################### pygloo install for gloo migration backend begin ####################

BAZEL_CMD = bazel
Expand Down
86 changes: 86 additions & 0 deletions bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python

import asyncio
import json
import time
from typing import List

from websockets.sync.client import connect

from blade_llm.protocol import (
GenerateRequest,
GenerateStreamResponse,
SamplingParams,
StoppingCriteria,
)

port = 8081

finish = 0

async def hello(max_new_tokens, ignore_eos):
headers = {
# "Authorization": "<You may need this header for EAS."
}
url = f"ws://22.3.131.34:{port}/generate_stream"
with connect(url, additional_headers=headers) as websocket:
import random
prompts = [f"what's {random.randint(a=0, b=1000000)} plus {random.randint(a=0, b=1000000)}?"]
for p in prompts:
print(f"Prompt : {p}")
req = GenerateRequest(
prompt=p,
sampling_params=SamplingParams(
temperature=-0.9,
top_p=0.9,
top_k=0,
),
stopping_criterial=StoppingCriteria(max_new_tokens=max_new_tokens, ignore_eos=ignore_eos),
)
websocket.send(req.model_dump_json())
texts = []
idx = 0
global finish
while True:
await asyncio.sleep(0)
msg = websocket.recv()
resp = GenerateStreamResponse(**json.loads(msg))
texts.extend([t.text for t in resp.tokens])
idx += 1
for t in resp.tokens:
print(t.text, end="")
if resp.is_finished:
finish += 1
break
print(len(texts), idx)
print(f"{finish}, Generated text: {''.join(texts)}")
print("-" * 40)


async def get_range(n):
for i in range(n):
yield i

async def main():
tasks: List[asyncio.Task] = []
num_requests = 500
max_new_tokens = 20
ignore_eos = False
start = time.time()
async for i in get_range(num_requests):
await asyncio.sleep(0.001)
task = asyncio.create_task(hello(max_new_tokens, ignore_eos))
tasks.append(task)
await asyncio.gather(*tasks)
elapsed = time.time() - start
output_tps = max_new_tokens * num_requests / elapsed
print(f"Generate {output_tps} tokens/s")

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, help="The port number to use")
args = parser.parse_args()

port = args.port

asyncio.run(main())
40 changes: 40 additions & 0 deletions benchmark/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async def async_request_gen(generator, qps: float, distribution="uniform", coeff

class GenerationBackend(str, Enum):
vLLM = "vLLM"
BladeLLM="BladeLLM"
NaiveHfPipeline = "NaiveHfPipeline"
RayGen = "RayGen"
FasterTransformer = "FasterTransformer"
Expand Down Expand Up @@ -116,6 +117,43 @@ async def query_model_vllm(prompt, verbose, ip_ports):
print(f"Connect to {ip_ports[server_id]} failed with: {str(e)}")
sys.exit(1)

async def query_model_bladellm(prompt, verbose, ip_ports):
prompt, prompt_len, expected_response_len = prompt

# Randomly dispatch request to the given api servers.
server_id = min(server_num_requests, key=server_num_requests.get)

timeout = aiohttp.ClientTimeout(total=4*60*60)

async with aiohttp.ClientSession(timeout=timeout) as session:
# TODO(xinyi): Remove hard codes of params.
output_len = expected_response_len
request_dict = {
"prompt": prompt,
"temperature": -0.9,
"top_p": 0.9,
"top_k": 1,
"max_new_tokens": max(output_len, 1),
"ignore_eos": True,
}
if verbose:
print('Querying model')
try:
async with session.post(f'http://{ip_ports[server_id]}/generate_benchmark', json=request_dict) as resp:
if verbose:
print('Done')

output = await resp.json()
# necessary for latency calc
output['response_len'] = expected_response_len
if verbose and 'generated_text' in output:
print(json.dumps(output['generated_text']))

return (prompt, output)
except aiohttp.ClientError as e:
print(f"Connect to {ip_ports[server_id]} failed with: {str(e)}")
sys.exit(1)

def load_prompts(prompt_file):
with open(prompt_file) as f:
prompts = [json.loads(l) for l in f.readlines()]
Expand Down Expand Up @@ -430,6 +468,8 @@ async def benchmark(

if backend == GenerationBackend.vLLM:
query_model = query_model_vllm
elif backend == GenerationBackend.BladeLLM:
query_model = query_model_bladellm
else:
raise ValueError(f'unknown backend {backend}')

Expand Down
23 changes: 23 additions & 0 deletions configs/blade.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
SERVER:
RAY_CLUSTER_PORT: 6379
LAUNCH_RAY_CLUSTER: False
QUEUE_TYPE: "rayqueue"


MANAGER:
DISABLE_FIXED_NODE_INIT_INSTANCE: False
DISABLE_INIT_INSTANCE_BY_MANAGER: True

LOAD_METRIC: 'remaining_steps'
DISPATCH_POLICY: 'load'

ENABLE_MIGRATION: False
ENABLE_DEFRAG: True
REQUEST_MIGRATION_POLICY: 'SJF'

MIGRATION_BACKEND: 'grpc'
MIGRATION_CACHE_BLOCKS: 512

ENABLE_SCALING: False

LOG_INSTANCE_INFO: True
22 changes: 19 additions & 3 deletions docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
[--profiling-result-file-path PROFILING_RESULT_FILE_PATH]
[--gpu-type GPU_TYPE]
[--polling-interval POLLING_INTERVAL]
[--migration-backend {gloo,nccl,rpc}]
[--migration-backend {gloo,nccl,rpc,grpc,kvtransfer}]
[--migration-buffer-blocks MIGRATION_BUFFER_BLOCKS]
[--migration-backend-transfer-type {cuda_ipc,rdma,}]
[--migration-backend-kvtransfer-naming-url MIGRATION_BACKEND_KVTRANSFER_NAMING_URL]
[--migration-backend-server-address MIGRATION_BACKEND_SERVER_ADDRESS]
[--migration-backend-init-timeout MIGRATION_BACKEND_INIT_TIMEOUT]
[--migration-num-layers MIGRATION_NUM_LAYERS]
[--last-stage-max-blocks LAST_STAGE_MAX_BLOCKS]
Expand Down Expand Up @@ -144,11 +147,24 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]

`--migration-backend`
- Communication backend of migration.
- Possible choices: gloo, rpc
- Possible choices: gloo, rpc, grpc, kvtransfer
- Default: "rpc"

`--migration-backend-transfer-type`
- Transfer type for migration backend grpc and kvTransfer.
- Possible choices: cuda_ipc, rdma, ""
- Default: ""

`--migration-backend-server-address`
- Address of grpc server for migration backend
- Default: "127.0.0.1:50051"

`--migration-backend-kvtransfer-naming-url`
- URL of naming server for kvtransfer migration backend
- Default: ""

`--migration-buffer-blocks`
- Number of cache blocks in migration.
- Number of buffer blocks in migration.
- Default: 512

`--migration-backend-init-timeout`
Expand Down
32 changes: 16 additions & 16 deletions llumnix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import vllm
from vllm import *
# import vllm
# from vllm import *

from llumnix.server_info import ServerInfo
from llumnix.entrypoints.setup import (launch_ray_cluster,
Expand All @@ -24,19 +24,19 @@
from llumnix.llumlet.llumlet import Llumlet
from llumnix.queue.queue_type import QueueType

from .version import __version__
# from .version import __version__

__all__ = [
"__version__",
"ServerInfo",
"launch_ray_cluster",
"connect_to_ray_cluster",
"init_manager",
"init_llumlets",
"EngineManagerArgs",
"LLMEngineManager",
"Llumlet",
"QueueType",
]
# __all__ = [
# "__version__",
# "ServerInfo",
# "launch_ray_cluster",
# "connect_to_ray_cluster",
# "init_manager",
# "init_llumlets",
# "EngineManagerArgs",
# "LLMEngineManager",
# "Llumlet",
# "QueueType",
# ]

__all__.extend(getattr(vllm, "__all__", []))
# __all__.extend(getattr(vllm, "__all__", []))
Loading

0 comments on commit ad6bcce

Please sign in to comment.