Skip to content

Commit

Permalink
fix benchmark serving computation mistake (#630)
Browse files Browse the repository at this point in the history
* fix benchmark serving computation mistake

* fix timestamps computations

* remove speed up

* no mp

* mp seems faster?

* remove

* update

* remove

* fix

* update

* update print log

* typo

* print fist token latency only stream==True

* remove renew_session

* update AsyncEngine
  • Loading branch information
AllentDan authored Nov 8, 2023
1 parent 11d1093 commit 529e56b
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 125 deletions.
102 changes: 68 additions & 34 deletions benchmark/profile_restful_api.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,56 @@
import json
import multiprocessing as mp
import random
import time
from queue import Queue
from threading import Thread

import fire
import numpy as np

from lmdeploy.serve.openai.api_client import get_streaming_response
from lmdeploy.tokenizer import Tokenizer
from lmdeploy.utils import get_logger


def infer(server_addr: str, session_id: int, req_queue: mp.Queue,
res_que: mp.Queue):
def infer(server_addr: str, session_id: int, req_queue: Queue, res_que: Queue,
stream_output: bool):
stats = []
while not req_queue.empty():
prompt, input_seqlen, output_seqlen = req_queue.get()
get_logger('profile_restful_api').info(
f'request info: session {session_id}, '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
for prompt, input_seqlen, output_seqlen in iter(req_queue.get,
[None, None, None]):
if prompt is None:
break
timestamps = []
tokens = []
start = time.perf_counter()
timestamps.append(time.perf_counter())
for res, token, status in get_streaming_response(
prompt,
server_addr,
session_id,
request_output_len=output_seqlen,
interactive_mode=False):
interactive_mode=False,
ignore_eos=True,
stream=stream_output):
timestamps.append(time.perf_counter())
tokens.append(token)

first_token_latency = timestamps[1] - start
token_latency = timestamps[-1] - timestamps[0]
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency])
first_token_latency = np.round(timestamps[1] - timestamps[0], 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3)
completion_tokens = tokens[-1]
total_tokens = tokens[-1] + input_seqlen
stats.append([
first_token_latency, completion_tokens, output_seqlen,
total_tokens, token_latency
])
print(f'session {session_id}: '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, '
f'completion_tokens {completion_tokens}')
res_que.put((session_id, stats))


def warmup(server_addr: str,
concurrency: int,
output_seqlen: int,
warmup_round: int = 1):
warmup_round: int = 1,
stream_output: bool = False):
print('start to warmup ...')

def _infer(server_addr, session_id):
Expand All @@ -50,13 +59,15 @@ def _infer(server_addr, session_id):
server_addr,
session_id,
request_output_len=output_seqlen,
interactive_mode=False):
interactive_mode=False,
stream=stream_output,
ignore_eos=True):
continue

_start = time.perf_counter()
procs = []
for i in range(concurrency):
proc = mp.Process(target=_infer, args=(server_addr, i + 1))
proc = Thread(target=_infer, args=(server_addr, i + 1))
procs.append(proc)
proc.start()
for proc in procs:
Expand All @@ -79,6 +90,7 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
print(f'elapsed time for read data: '
f'{round(time.perf_counter() - start, 2)} s')

print('start tokenization. This takes a while, please wait...')
start = time.perf_counter()
tokenizer = Tokenizer(tokenizer_path)
prompts_token_lens = [len(tokenizer.encode(prompt)) for prompt in prompts]
Expand All @@ -100,9 +112,10 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
if samples > 0:
filtered_dataset = random.sample(filtered_dataset, samples)

que = mp.Queue()
que = Queue()
for data in filtered_dataset:
que.put(data)
que.put((None, None, None))
print(f'elapsed time for filtering: '
f'{round(time.perf_counter() - start, 2)} s')
return que, len(filtered_dataset)
Expand All @@ -113,17 +126,20 @@ def main(server_addr: str,
dataset_path: str,
concurrency: int = 1,
session_len: int = 2048,
samples: int = 1000):
samples: int = 1000,
stream_output: bool = False):
api_url = server_addr + '/v1/chat/interactive'
warmup(api_url, concurrency, session_len - 1)
warmup(api_url, concurrency, session_len - 1, 4, stream_output)
req_queue, n_req = read_dataset(tokenizer_path, dataset_path, samples,
session_len)
res_que = mp.Queue()
for i in range(concurrency):
req_queue.put([None, None, None])
res_que = Queue()
procs = []
_start = time.perf_counter()
for i in range(concurrency):
proc = mp.Process(target=infer,
args=(api_url, i + 1, req_queue, res_que))
proc = Thread(target=infer,
args=(api_url, i + 1, req_queue, res_que, stream_output))
procs.append(proc)
proc.start()
for proc in procs:
Expand All @@ -138,22 +154,40 @@ def main(server_addr: str,
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats))

stats = np.concatenate(stats).reshape(-1, 3)
stats = np.concatenate(stats).reshape(-1, 5)

first_token_latency_min = np.min(stats[:, 0], axis=0)
first_token_latency_max = np.max(stats[:, 0], axis=0)
first_token_latency_ave = np.mean(stats[:, 0], axis=0)
token_throughput = np.sum(stats[:, 1], axis=0) / elapsed_time
req_throughput = n_req / elapsed_time
completion_tokens = np.sum(stats[:, 1], axis=0)
request_output_tokens = np.sum(stats[:, 2], axis=0)
total_tokens = np.sum(stats[:, 3], axis=0)
prompt_tokens = total_tokens - completion_tokens
completion_token_throughput = completion_tokens / elapsed_time
total_token_throughput = total_tokens / elapsed_time
rqs = n_req / elapsed_time
rqm = rqs * 60

if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False:
print(f'Did not generate requested number of tokens. '
f'Request {request_output_tokens:.0f}, '
f'but got {completion_tokens:.0f}')

print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.2f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, '
f'{first_token_latency_ave:.2f}s\n'
f'token throughput: {token_throughput:.2f} token/s\n'
f'req throughput: {req_throughput:.2f} req/s\n'
f'{"-" * 50}\n')
f'elapsed_time: {elapsed_time:.3f}s\n')
if stream_output:
print(f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.3f}s, '
f'{first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n')
print(
f'number of prompt tokens: {prompt_tokens:.0f}\n'
f'number of completion tokens: {completion_tokens:.0f}\n'
f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa
f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa
f'RPS (request per second): {rqs:.3f} req/s\n'
f'RPM (request per minute): {rqm:.3f} req/min\n'
f'{"-" * 50}\n')


if __name__ == '__main__':
Expand Down
65 changes: 43 additions & 22 deletions benchmark/profile_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
[None, None, None]):
timestamps = []
tokens = []
start = time.perf_counter()
timestamps.append(time.perf_counter())
for status, res, token in chatbot.stream_infer(
session_id,
prompt,
Expand All @@ -26,13 +26,17 @@ def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
sequence_end=True):
timestamps.append(time.perf_counter())
tokens.append(token)

first_token_latency = np.round(timestamps[1] - start, 3)
first_token_latency = np.round(timestamps[1] - timestamps[0], 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3)
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency])
completion_tokens = tokens[-1]
total_tokens = tokens[-1] + input_seqlen
stats.append([
first_token_latency, completion_tokens, output_seqlen,
total_tokens, token_latency
])
print(f'session {session_id}: '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}, '
f'completion_tokens {completion_tokens}')
res_que.put((session_id, stats))


Expand Down Expand Up @@ -84,6 +88,7 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
completions = [completion for _, completion in dataset]
print(f'elapsed time for read data: '
f'{round(time.perf_counter() - start, 2)} s')
print('start tokenization. This takes a while, please wait...')

start = time.perf_counter()
tokenizer = Tokenizer(tokenizer_path)
Expand Down Expand Up @@ -124,7 +129,6 @@ def main(tritonserver_addr: str,
res_que = mp.Queue()

procs = []
_start = time.perf_counter()
for i in range(concurrency):
chatbot = Chatbot(tritonserver_addr=tritonserver_addr,
display=False,
Expand All @@ -134,13 +138,15 @@ def main(tritonserver_addr: str,
proc = mp.Process(target=infer,
args=(chatbot, i + 1, req_que, res_que))
procs.append(proc)
proc.start()

# read data and put it to queue
n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len,
req_que)
for i in range(concurrency):
req_que.put([None, None, None])
_start = time.perf_counter()
for proc in procs:
proc.start()

stats = []
for i in range(concurrency):
Expand All @@ -149,27 +155,42 @@ def main(tritonserver_addr: str,
f'session {session_id}: processed reqs {len(_stats)}, '
f'stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats))

_end = time.perf_counter()

elapsed_time = _end - _start

stats = np.concatenate(stats).reshape(-1, 3)
stats = np.concatenate(stats).reshape(-1, 5)

first_token_latency_min = np.min(stats[:, 0], axis=0)
first_token_latency_max = np.max(stats[:, 0], axis=0)
first_token_latency_ave = np.mean(stats[:, 0], axis=0)
token_throughput = np.sum(stats[:, 1], axis=0) / elapsed_time
req_throughput = n_req / elapsed_time

print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n'
f'token throughput: {token_throughput:.3f} token/s\n'
f'req throughput: {req_throughput:.3f} req/s\n'
f'{"-" * 50}\n')

completion_tokens = np.sum(stats[:, 1], axis=0)
request_output_tokens = np.sum(stats[:, 2], axis=0)
total_tokens = np.sum(stats[:, 3], axis=0)
prompt_tokens = total_tokens - completion_tokens
completion_token_throughput = completion_tokens / elapsed_time
total_token_throughput = total_tokens / elapsed_time
rqs = n_req / elapsed_time
rqm = rqs * 60

if (np.abs(stats[:, 1] - stats[:, 2]) <= 1).min() is False:
print(f'Did not generate requested number of tokens. '
f'Request {request_output_tokens:.0f}, '
f'but got {completion_tokens:.0f}')

print(
f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.3f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n'
f'number of prompt tokens: {prompt_tokens:.0f}\n'
f'number of completion tokens: {completion_tokens:.0f}\n'
f'token throughput (completion token): {completion_token_throughput:.3f} token/s\n' # noqa
f'token throughput (prompt + completion token): {total_token_throughput:.3f} token/s\n' # noqa
f'RPS (request per second): {rqs:.3f} req/s\n'
f'RPM (request per minute): {rqm:.3f} req/min\n'
f'{"-" * 50}\n')
for proc in procs:
proc.join()

Expand Down
Loading

0 comments on commit 529e56b

Please sign in to comment.