Skip to content

Commit

Permalink
Fix profile_serving hung issue (#344)
Browse files Browse the repository at this point in the history
* read data after start processes

* fix hang

* fix exceptions when request_output_len is 0
  • Loading branch information
lvhan028 authored Sep 4, 2023
1 parent 9bfe03c commit edb7c6e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 25 deletions.
58 changes: 34 additions & 24 deletions benchmark/profile_serving.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import multiprocessing as mp
import os
import random
Expand Down Expand Up @@ -28,10 +29,8 @@ def encode(self, prompts: List):

def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
stats = []
while not req_que.empty():
prompt, input_seqlen, output_seqlen = req_que.get()
print(f'request info: session {session_id}, '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
for prompt, input_seqlen, output_seqlen in iter(req_que.get,
[None, None, None]):
timestamps = []
tokens = []
start = time.perf_counter()
Expand All @@ -43,12 +42,13 @@ def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue):
sequence_end=True):
timestamps.append(time.perf_counter())
tokens.append(token)
chatbot.reset_session()

first_token_latency = timestamps[1] - start
token_latency = timestamps[-1] - timestamps[0]
first_token_latency = np.round(timestamps[1] - start, 3)
token_latency = np.round(timestamps[-1] - timestamps[0], 3)
token = tokens[-1] - tokens[0]
stats.append([first_token_latency, token, token_latency])
print(f'session {session_id}: '
f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}')
res_que.put((session_id, stats))


Expand All @@ -73,6 +73,7 @@ def _infer(_chatbot, session_id):
chatbots = [
Chatbot(tritonserver_addr=tritonserver_addr,
ignore_eos=True,
log_level=logging.ERROR,
profile_generation=True) for _ in range(concurrency)
]
procs = []
Expand All @@ -87,7 +88,7 @@ def _infer(_chatbot, session_id):


def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
session_len: int):
session_len: int, que: mp.Queue):
start = time.perf_counter()
with open(dataset_path) as f:
dataset = json.load(f)
Expand Down Expand Up @@ -119,12 +120,11 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int,
if samples > 0:
filtered_dataset = random.sample(filtered_dataset, samples)

que = mp.Queue()
for data in filtered_dataset:
que.put(data)
print(f'elapsed time for filtering: '
f'{round(time.perf_counter() - start, 2)} s')
return que, len(filtered_dataset)
return len(filtered_dataset)


def main(tritonserver_addr: str,
Expand All @@ -134,32 +134,39 @@ def main(tritonserver_addr: str,
session_len: int = 2048,
samples: int = 1000):
warmup(tritonserver_addr, concurrency, session_len - 1)
req_que, n_req = read_dataset(tokenizer_path, dataset_path, samples,
session_len)
req_que = mp.Queue()
res_que = mp.Queue()

procs = []
_start = time.perf_counter()
for i in range(concurrency):
chatbot = Chatbot(tritonserver_addr=tritonserver_addr,
display=False,
profile_serving=True,
ignore_eos=True)
ignore_eos=True,
log_level=logging.ERROR)
proc = mp.Process(target=infer,
args=(chatbot, i + 1, req_que, res_que))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
_end = time.perf_counter()
elapsed_time = _end - _start

# read data and put it to queue
n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len,
req_que)
for i in range(concurrency):
req_que.put([None, None, None])

stats = []
while not res_que.empty():
for i in range(concurrency):
session_id, _stats = res_que.get()
print(f'\n{"-" * 50}\n'
f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n')
f'session {session_id}: processed reqs {len(_stats)}, '
f'stats: \n{_stats}\n{"-" * 50}\n')
stats.append(np.array(_stats))

_end = time.perf_counter()
elapsed_time = _end - _start

stats = np.concatenate(stats).reshape(-1, 3)

first_token_latency_min = np.min(stats[:, 0], axis=0)
Expand All @@ -169,14 +176,17 @@ def main(tritonserver_addr: str,
req_throughput = n_req / elapsed_time

print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n'
f'elapsed_time: {elapsed_time:.2f}s\n'
f'elapsed_time: {elapsed_time:.3f}s\n'
f'first_token latency(min, max, ave): '
f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, '
f'{first_token_latency_ave:.2f}s\n'
f'token throughput: {token_throughput:.2f} token/s\n'
f'req throughput: {req_throughput:.2f} req/s\n'
f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, '
f'{first_token_latency_ave:.3f}s\n'
f'token throughput: {token_throughput:.3f} token/s\n'
f'req throughput: {req_throughput:.3f} req/s\n'
f'{"-" * 50}\n')

for proc in procs:
proc.join()


if __name__ == '__main__':
fire.Fire(main)
5 changes: 4 additions & 1 deletion lmdeploy/serve/turbomind/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,8 @@ def stream_consumer(postprocess, res_queue, session, n_input_token,
output_ids = output_ids[:, :, n_input_token +
preseq_length:sequence_length.squeeze(
)]
last_token_id = output_ids[-1, -1, -1]
last_token_id = None if output_ids.shape[
-1] == 0 else output_ids[-1, -1, -1]
if last_token_id == eos_id:
session.sequence_length = session.sequence_length - 1
output_ids = output_ids[:, :, :-1]
Expand All @@ -652,6 +653,8 @@ def stream_consumer(postprocess, res_queue, session, n_input_token,
output_ids.shape[-1])
except Exception as e:
logger.error(f'catch exception: {e}')
logger.error(
f'session {session.session_id}: prompt: {session.prompt}')

# put session back to queue so that `_stream_infer` can update it in
# `self.sessions`
Expand Down

0 comments on commit edb7c6e

Please sign in to comment.