Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
juncaipeng committed Sep 23, 2024
1 parent 030d5b5 commit f186bc1
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
3 changes: 2 additions & 1 deletion llm/benchmark/analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ def save_output_text(result_list, input_path):
output_text = ""
for i in output_list:
output_text += i['token']
dict_obj = {'req_id': result.req_id, 'input_text': result.input_text, 'output_text': output_text}
#dict_obj = {'req_id': result.req_id, 'input_text': result.input_text, 'output_text': output_text}
dict_obj = {'input_text': result.input_text, 'output_text': output_text}
out_file.write(json.dumps(dict_obj, ensure_ascii=False) + "\n")
print(f"output save in {output_path}")

Expand Down
9 changes: 4 additions & 5 deletions llm/server/server/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from server.engine.resource_manager import ResourceManager
from server.engine.task_queue_manager import (TaskQueueManager,
launch_task_queue_manager)
from server.engine.token_processor import TokenProcessor
from server.engine.out_processor import OutProcessor
from server.utils import model_server_logger


Expand All @@ -37,8 +37,8 @@ class Engine(object):
def __init__(self, cfg):
self.cfg = cfg
self.resource_manager = ResourceManager(self.cfg)
self.token_processor = TokenProcessor(self.cfg)
self.token_processor.set_resource_manager(self.resource_manager)
self.out_processor = OutProcessor(self.cfg)
self.out_processor.set_resource_manager(self.resource_manager)

self._init_engine_flags()

Expand Down Expand Up @@ -273,11 +273,10 @@ def _start_task_queue_manager(self):
"""
p = multiprocessing.Process(target=launch_task_queue_manager, args=(self.cfg.infer_port, self.cfg.mp_num))
p.start()
time.sleep(0.3)
if p.is_alive():
model_server_logger.info("start tasks queue service successfully")
else:
error_msg = "Failed to start tasks queue service, please check " \
error_msg = "Failed to start task queue manager, please check " \
"the log/task_queue_manager.log for details"
model_server_logger.info(error_msg)
raise Exception(error_msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from server.utils import datetime_diff, model_server_logger, monitor_logger


class TokenProcessor(object):
class OutProcessor(object):
"""
get Token/Score from Paddle inference engine
"""
Expand Down
20 changes: 18 additions & 2 deletions llm/server/server/engine/task_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from server.utils import get_logger

logger = get_logger("infer_server", "task_queue_manager.log")
logger = get_logger("task_queue_manager", "task_queue_manager.log")


class QueueManager(BaseManager):
Expand Down Expand Up @@ -53,7 +53,22 @@ def __init__(self, rank=0, mp_num=8, port=56666):
self.client_manager = QueueManager(address=('127.0.0.1', port),
authkey=b'infer_queue'
)
self.client_manager.connect()

retries = 10
delay = 0.5
for attempt in range(1, retries + 1):
try:
self.client_manager.connect()
logger.info(f"connect client manager success on attempt {attempt}")
break
except ConnectionRefusedError:
if attempt == retries:
logger.error(f"failed to connect after {retries} attempts.")
raise
else:
logger.warning(f"connection attempt {attempt} failed, retrying in {delay} seconds...")
time.sleep(delay)

self.list = self.client_manager.get_list()
self.value = self.client_manager.get_value()
self.lock = self.client_manager.get_lock()
Expand Down Expand Up @@ -163,3 +178,4 @@ def launch_task_queue_manager(port, num_workers):
except Exception as e:
logger.error(f"launch queue service failed, error_msg: {e}")
raise e
logger.error("task queue manager exit")
2 changes: 1 addition & 1 deletion llm/server/server/triton_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def _send_output(self):
"""
while True:
try:
batch_result = self.engine.token_processor.out_queue.get()
batch_result = self.engine.out_processor.out_queue.get()
for result in batch_result:
req_id = result["req_id"]
is_end = result.get("is_end", 0)
Expand Down

0 comments on commit f186bc1

Please sign in to comment.