From edb7c6eccf41a378d2a05dc87f8c10924fd324a4 Mon Sep 17 00:00:00 2001 From: Lyu Han Date: Mon, 4 Sep 2023 11:07:52 +0800 Subject: [PATCH] Fix profile_serving hung issue (#344) * read data after start processes * fix hang * fix exceptions when request_output_len is 0 --- benchmark/profile_serving.py | 58 +++++++++++++++++------------ lmdeploy/serve/turbomind/chatbot.py | 5 ++- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/benchmark/profile_serving.py b/benchmark/profile_serving.py index 16fed7460..8973352bc 100644 --- a/benchmark/profile_serving.py +++ b/benchmark/profile_serving.py @@ -1,4 +1,5 @@ import json +import logging import multiprocessing as mp import os import random @@ -28,10 +29,8 @@ def encode(self, prompts: List): def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue): stats = [] - while not req_que.empty(): - prompt, input_seqlen, output_seqlen = req_que.get() - print(f'request info: session {session_id}, ' - f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}') + for prompt, input_seqlen, output_seqlen in iter(req_que.get, + [None, None, None]): timestamps = [] tokens = [] start = time.perf_counter() @@ -43,12 +42,13 @@ def infer(chatbot, session_id: int, req_que: mp.Queue, res_que: mp.Queue): sequence_end=True): timestamps.append(time.perf_counter()) tokens.append(token) - chatbot.reset_session() - first_token_latency = timestamps[1] - start - token_latency = timestamps[-1] - timestamps[0] + first_token_latency = np.round(timestamps[1] - start, 3) + token_latency = np.round(timestamps[-1] - timestamps[0], 3) token = tokens[-1] - tokens[0] stats.append([first_token_latency, token, token_latency]) + print(f'session {session_id}: ' + f'input_seqlen {input_seqlen}, output_seqlen {output_seqlen}') res_que.put((session_id, stats)) @@ -73,6 +73,7 @@ def _infer(_chatbot, session_id): chatbots = [ Chatbot(tritonserver_addr=tritonserver_addr, ignore_eos=True, + log_level=logging.ERROR, profile_generation=True) for _ in range(concurrency) ] procs = [] @@ -87,7 +88,7 @@ def _infer(_chatbot, session_id): def read_dataset(tokenizer_path: str, dataset_path: str, samples: int, - session_len: int): + session_len: int, que: mp.Queue): start = time.perf_counter() with open(dataset_path) as f: dataset = json.load(f) @@ -119,12 +120,11 @@ def read_dataset(tokenizer_path: str, dataset_path: str, samples: int, if samples > 0: filtered_dataset = random.sample(filtered_dataset, samples) - que = mp.Queue() for data in filtered_dataset: que.put(data) print(f'elapsed time for filtering: ' f'{round(time.perf_counter() - start, 2)} s') - return que, len(filtered_dataset) + return len(filtered_dataset) def main(tritonserver_addr: str, @@ -134,32 +134,39 @@ def main(tritonserver_addr: str, session_len: int = 2048, samples: int = 1000): warmup(tritonserver_addr, concurrency, session_len - 1) - req_que, n_req = read_dataset(tokenizer_path, dataset_path, samples, - session_len) + req_que = mp.Queue() res_que = mp.Queue() + procs = [] _start = time.perf_counter() for i in range(concurrency): chatbot = Chatbot(tritonserver_addr=tritonserver_addr, display=False, profile_serving=True, - ignore_eos=True) + ignore_eos=True, + log_level=logging.ERROR) proc = mp.Process(target=infer, args=(chatbot, i + 1, req_que, res_que)) procs.append(proc) proc.start() - for proc in procs: - proc.join() - _end = time.perf_counter() - elapsed_time = _end - _start + + # read data and put it to queue + n_req = read_dataset(tokenizer_path, dataset_path, samples, session_len, + req_que) + for i in range(concurrency): + req_que.put([None, None, None]) stats = [] - while not res_que.empty(): + for i in range(concurrency): session_id, _stats = res_que.get() print(f'\n{"-" * 50}\n' - f'session {session_id} stats: \n{_stats}\n{"-" * 50}\n') + f'session {session_id}: processed reqs {len(_stats)}, ' + f'stats: \n{_stats}\n{"-" * 50}\n') stats.append(np.array(_stats)) + _end = time.perf_counter() + elapsed_time = _end - _start + stats = np.concatenate(stats).reshape(-1, 3) first_token_latency_min = np.min(stats[:, 0], axis=0) @@ -169,14 +176,17 @@ def main(tritonserver_addr: str, req_throughput = n_req / elapsed_time print(f'\n{"-" * 50}\nconcurrency: {concurrency}\n' - f'elapsed_time: {elapsed_time:.2f}s\n' + f'elapsed_time: {elapsed_time:.3f}s\n' f'first_token latency(min, max, ave): ' - f'{first_token_latency_min:.2f}s, {first_token_latency_max:.2f}s, ' - f'{first_token_latency_ave:.2f}s\n' - f'token throughput: {token_throughput:.2f} token/s\n' - f'req throughput: {req_throughput:.2f} req/s\n' + f'{first_token_latency_min:.3f}s, {first_token_latency_max:.3f}s, ' + f'{first_token_latency_ave:.3f}s\n' + f'token throughput: {token_throughput:.3f} token/s\n' + f'req throughput: {req_throughput:.3f} req/s\n' f'{"-" * 50}\n') + for proc in procs: + proc.join() + if __name__ == '__main__': fire.Fire(main) diff --git a/lmdeploy/serve/turbomind/chatbot.py b/lmdeploy/serve/turbomind/chatbot.py index c5048bd2b..1212d3459 100644 --- a/lmdeploy/serve/turbomind/chatbot.py +++ b/lmdeploy/serve/turbomind/chatbot.py @@ -631,7 +631,8 @@ def stream_consumer(postprocess, res_queue, session, n_input_token, output_ids = output_ids[:, :, n_input_token + preseq_length:sequence_length.squeeze( )] - last_token_id = output_ids[-1, -1, -1] + last_token_id = None if output_ids.shape[ + -1] == 0 else output_ids[-1, -1, -1] if last_token_id == eos_id: session.sequence_length = session.sequence_length - 1 output_ids = output_ids[:, :, :-1] @@ -652,6 +653,8 @@ def stream_consumer(postprocess, res_queue, session, n_input_token, output_ids.shape[-1]) except Exception as e: logger.error(f'catch exception: {e}') + logger.error( + f'session {session.session_id}: prompt: {session.prompt}') # put session back to queue so that `_stream_infer` can update it in # `self.sessions`