From 84bb411a7c6e36d7d9765c8f3ead93d82a9c6406 Mon Sep 17 00:00:00 2001 From: chenjian Date: Wed, 8 Nov 2023 11:11:59 +0800 Subject: [PATCH 1/9] add inference load balancer for fastdeploy llm --- llm/fastdeploy_ic/__init__.py | 0 llm/fastdeploy_ic/config.py | 26 ++++ llm/fastdeploy_ic/data/__init__.py | 0 llm/fastdeploy_ic/data/manager.py | 103 +++++++++++++++ llm/fastdeploy_ic/proto/__init__.py | 3 + llm/fastdeploy_ic/proto/ic.proto | 99 ++++++++++++++ llm/fastdeploy_ic/proto/ic_pb2.py | 41 ++++++ llm/fastdeploy_ic/proto/ic_pb2_grpc.py | 175 +++++++++++++++++++++++++ llm/fastdeploy_ic/server/__init__.py | 0 llm/fastdeploy_ic/server/api.py | 167 +++++++++++++++++++++++ llm/fastdeploy_ic/server/launcher.py | 72 ++++++++++ llm/fastdeploy_ic/utils.py | 155 ++++++++++++++++++++++ 12 files changed, 841 insertions(+) create mode 100644 llm/fastdeploy_ic/__init__.py create mode 100644 llm/fastdeploy_ic/config.py create mode 100644 llm/fastdeploy_ic/data/__init__.py create mode 100644 llm/fastdeploy_ic/data/manager.py create mode 100644 llm/fastdeploy_ic/proto/__init__.py create mode 100644 llm/fastdeploy_ic/proto/ic.proto create mode 100644 llm/fastdeploy_ic/proto/ic_pb2.py create mode 100644 llm/fastdeploy_ic/proto/ic_pb2_grpc.py create mode 100644 llm/fastdeploy_ic/server/__init__.py create mode 100644 llm/fastdeploy_ic/server/api.py create mode 100644 llm/fastdeploy_ic/server/launcher.py create mode 100644 llm/fastdeploy_ic/utils.py diff --git a/llm/fastdeploy_ic/__init__.py b/llm/fastdeploy_ic/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/llm/fastdeploy_ic/config.py b/llm/fastdeploy_ic/config.py new file mode 100644 index 0000000000..b7f75f5c31 --- /dev/null +++ b/llm/fastdeploy_ic/config.py @@ -0,0 +1,26 @@ +import os +import json + +class GlobalConfig(): + """ global config """ + + def __init__(self): + """init + Args: + None + Returns: + None + """ + # Redis + self.redis_host = os.getenv('REDIS_HOST', default="localhost") + self.redis_port = int(os.getenv('REDIS_PORT', default="6379")) + self.redis_db = int(os.getenv('REDIS_DB', default="0")) + self.redis_username = os.getenv('REDIS_USERNAME', default=None) + self.redis_password = os.getenv('REDIS_PASSWORD', default=None) + + # Response + self.resonpse_timeout = int(os.getenv('RESPONSE_TIMEOUT', default="120")) + + # Logger + self.log_dir = os.getenv('IC_LOG_DIR', default='ic_logs') + \ No newline at end of file diff --git a/llm/fastdeploy_ic/data/__init__.py b/llm/fastdeploy_ic/data/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/llm/fastdeploy_ic/data/manager.py b/llm/fastdeploy_ic/data/manager.py new file mode 100644 index 0000000000..d07c209fc6 --- /dev/null +++ b/llm/fastdeploy_ic/data/manager.py @@ -0,0 +1,103 @@ + +import json +import math + +import aioredis + +import fastdeploy_ic.proto.ic_pb2 as ic_pb2 +from fastdeploy_ic.utils import get_logger + +logger = get_logger("data_manager", "ic_data_manager.log") + +class DataManager: + def __init__(self, redis_conf) -> None: + self.client = aioredis.Redis(**redis_conf) + self.internal_check_key_prefix = '__keymap_' + + async def check_req_id_exist(self, model_id, req_id): + key = '{}{}'.format(self.internal_check_key_prefix, model_id) + logger.info("check_req_id_exist: key: {} value: {}".format(key, req_id)) + await self.client.sismember(key, req_id) + + async def add_req_id_to_map(self, model_id, req_id): + key = '{}{}'.format(self.internal_check_key_prefix, model_id) + logger.info("add_req_id_to_map: key: {} value: {}".format(key, req_id)) + await self.client.sadd(key, req_id) + + async def remove_req_id_from_map(self, model_id, req_id): + key = '{}{}'.format(self.internal_check_key_prefix, model_id) + logger.info("remove_req_id_from_map: key: {} value: {}".format(key, req_id)) + await self.client.srem(key, req_id) + + async def enque_request(self, model_id, req, to_end=True): + serialized_req = req.SerializeToString() + # key = model_id + logger.info("enque_request: key: {} value: {}".format(model_id, req)) + if to_end: + await self.client.rpush(model_id, serialized_req) + else: + await self.client.lpush(model_id, serialized_req) + + async def deque_request(self, model_id): + data = await self.client.lpop(model_id) + if data is not None: + data = ic_pb2.ModelInferRequest.FromString(data) + logger.info("deque_request: key: {} value: {}".format(model_id, data)) + return data + + async def remove_request(self, model_id, req): + serialized_req = req.SerializeToString() + logger.info("remove_request: key: {} value: {}".format(model_id, req)) + await self.client.lrem(model_id, 1, serialized_req) + + async def enque_response(self, model_id, req_id, res, to_end=True): + serialized_res = res.SerializeToString() + key = '{}/{}'.format(model_id, req_id) + logger.info("enque_response: key: {} value: {}".format(key, res)) + if to_end: + await self.client.rpush(key, serialized_res) + else: + await self.client.lpush(key, serialized_res) + + async def deque_response(self, model_id, req_id): + key = '{}/{}'.format(model_id, req_id) + data = await self.client.lpop(key) + if data is not None: + data = ic_pb2.ModelInferResponse.FromString(data) + logger.info("deque_response: key: {} value: {}".format(key, data)) + return data + + async def clear_response(self, model_id, req_id): + key = '{}/{}'.format(model_id, req_id) + logger.info("clear_response: key: {}".format(key)) + await self.client.delete(key) + + async def get_requests_by_number(self, model_id, max_request_num): + # return requests by ByRequest strategy + requests = [] + for i in range(max_request_num): + request = await self.deque_request(model_id) + if request is not None: + requests.append(request) + else: + break + logger.info("get_requests_by_number: model_id: {} length: {}".format(model_id, len(requests))) + return requests + + async def get_requests_by_block(self, model_id, max_request_num, block_num, block_size, dec_token_num): + # return requests by ByToken strategy + requests = [] + left_block_num = block_num + for i in range(max_request_num): + request = await self.deque_request(model_id) + if request is not None: + text_words_num = json.loads(request.input)['text_words_num'] + need_block_num = math.ceil((text_words_num + dec_token_num)/block_size) + if need_block_num < left_block_num: + requests.append(request) + left_block_num -= need_block_num + else: + await self.enque_request(model_id, request, to_end=False) + break + logger.info("get_requests_by_block: model_id: {} length: {}".format(model_id, len(requests))) + return requests \ No newline at end of file diff --git a/llm/fastdeploy_ic/proto/__init__.py b/llm/fastdeploy_ic/proto/__init__.py new file mode 100644 index 0000000000..c775de9280 --- /dev/null +++ b/llm/fastdeploy_ic/proto/__init__.py @@ -0,0 +1,3 @@ +import sys +import os +sys.path.append(os.path.dirname(__file__)) \ No newline at end of file diff --git a/llm/fastdeploy_ic/proto/ic.proto b/llm/fastdeploy_ic/proto/ic.proto new file mode 100644 index 0000000000..6a6cf2d261 --- /dev/null +++ b/llm/fastdeploy_ic/proto/ic.proto @@ -0,0 +1,99 @@ +syntax = "proto3"; +package language_inference; + +// Inference Server GRPC endpoints. +service GRPCInferenceService +{ + // 模型推理请求入口,给上层dispatch调用 + // 输入一个请求,流式返回多个response + rpc ModelStreamInfer(ModelInferRequest) returns (stream ModelInferResponse) {} + + // 拉取一个请求,给inference server调用 + rpc ModelFetchRequest(ModelFetchRequestParams) returns (ModelFetchRequestResult) {} + + // 发送请求的返回结果,给inference server调用 + // response是流式的发送 + rpc ModelSendResponse(stream ModelInferResponse) returns (ModelSendResponseResult) {} + + // 批量发送请求的返回结果,给inference server调用 + // response是流式的发送 + rpc ModelSendResponseList(stream ModelInferResponseList) returns (ModelSendResponseResult) {} +} + +message ModelFetchRequestParams +{ + // 模型全局唯一id + repeated string model_id = 1; + + // 一次返回的最大请求数 + int32 max_request_num = 2; + + FetchStrategy strategy = 3; + + ByTokenParams by_token_params = 4; +} +// 根据 token 数量拉取请求的计算公式: +// 每个query需要的block数量: block_num = ceil((text_words_num + dec_token_num)/block_size) + +enum FetchStrategy { + // 根据 request 数量拉取请求 + ByRequest = 0; // 默认值 + + // 根据 token 数量拉取请求 + ByToken = 1; +} + +message ByTokenParams +{ + // 可用的 block 数量 + int32 block_num = 1; + + // 每个 block 能支持的 token 数量 + int32 block_size = 2; + + // 每个 query 需要给输出预留的 token 数量 + int32 dec_token_num = 3; +} + +message ModelFetchRequestResult +{ + // 获取到的请求数组 + repeated ModelInferRequest requests = 1; +} + +// 无需关心SendResponse的返回值 +message ModelSendResponseResult { +} + +message ModelInferRequest +{ + // 模型唯一id + string model_id = 1; + + // 请求唯一id,必须全局唯一 + string request_id = 2; + + // 串联上下游日志的id,用于定位问题 + string trace_id = 3; + + // 语言模型输入 + string input = 4; +} + +message ModelInferResponseList{ + repeated ModelInferResponse response_list = 1; +} + +message ModelInferResponse +{ + // 请求唯一id + string request_id = 1; + + // 返回的句子id,表示第几句,用于去重和排序 + int32 sentence_id = 2; + + // 语言模型输出 + string output = 3; +} + + diff --git a/llm/fastdeploy_ic/proto/ic_pb2.py b/llm/fastdeploy_ic/proto/ic_pb2.py new file mode 100644 index 0000000000..1c4793d2f4 --- /dev/null +++ b/llm/fastdeploy_ic/proto/ic_pb2.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: ic.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x08ic.proto\x12\x12language_inference\"\xb5\x01\n\x17ModelFetchRequestParams\x12\x10\n\x08model_id\x18\x01 \x03(\t\x12\x17\n\x0fmax_request_num\x18\x02 \x01(\x05\x12\x33\n\x08strategy\x18\x03 \x01(\x0e\x32!.language_inference.FetchStrategy\x12:\n\x0f\x62y_token_params\x18\x04 \x01(\x0b\x32!.language_inference.ByTokenParams\"M\n\rByTokenParams\x12\x11\n\tblock_num\x18\x01 \x01(\x05\x12\x12\n\nblock_size\x18\x02 \x01(\x05\x12\x15\n\rdec_token_num\x18\x03 \x01(\x05\"R\n\x17ModelFetchRequestResult\x12\x37\n\x08requests\x18\x01 \x03(\x0b\x32%.language_inference.ModelInferRequest\"\x19\n\x17ModelSendResponseResult\"Z\n\x11ModelInferRequest\x12\x10\n\x08model_id\x18\x01 \x01(\t\x12\x12\n\nrequest_id\x18\x02 \x01(\t\x12\x10\n\x08trace_id\x18\x03 \x01(\t\x12\r\n\x05input\x18\x04 \x01(\t\"W\n\x16ModelInferResponseList\x12=\n\rresponse_list\x18\x01 \x03(\x0b\x32&.language_inference.ModelInferResponse\"M\n\x12ModelInferResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x13\n\x0bsentence_id\x18\x02 \x01(\x05\x12\x0e\n\x06output\x18\x03 \x01(\t*+\n\rFetchStrategy\x12\r\n\tByRequest\x10\x00\x12\x0b\n\x07\x42yToken\x10\x01\x32\xd2\x03\n\x14GRPCInferenceService\x12\x65\n\x10ModelStreamInfer\x12%.language_inference.ModelInferRequest\x1a&.language_inference.ModelInferResponse\"\x00\x30\x01\x12o\n\x11ModelFetchRequest\x12+.language_inference.ModelFetchRequestParams\x1a+.language_inference.ModelFetchRequestResult\"\x00\x12l\n\x11ModelSendResponse\x12&.language_inference.ModelInferResponse\x1a+.language_inference.ModelSendResponseResult\"\x00(\x01\x12t\n\x15ModelSendResponseList\x12*.language_inference.ModelInferResponseList\x1a+.language_inference.ModelSendResponseResult\"\x00(\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'ic_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_FETCHSTRATEGY']._serialized_start=666 + _globals['_FETCHSTRATEGY']._serialized_end=709 + _globals['_MODELFETCHREQUESTPARAMS']._serialized_start=33 + _globals['_MODELFETCHREQUESTPARAMS']._serialized_end=214 + _globals['_BYTOKENPARAMS']._serialized_start=216 + _globals['_BYTOKENPARAMS']._serialized_end=293 + _globals['_MODELFETCHREQUESTRESULT']._serialized_start=295 + _globals['_MODELFETCHREQUESTRESULT']._serialized_end=377 + _globals['_MODELSENDRESPONSERESULT']._serialized_start=379 + _globals['_MODELSENDRESPONSERESULT']._serialized_end=404 + _globals['_MODELINFERREQUEST']._serialized_start=406 + _globals['_MODELINFERREQUEST']._serialized_end=496 + _globals['_MODELINFERRESPONSELIST']._serialized_start=498 + _globals['_MODELINFERRESPONSELIST']._serialized_end=585 + _globals['_MODELINFERRESPONSE']._serialized_start=587 + _globals['_MODELINFERRESPONSE']._serialized_end=664 + _globals['_GRPCINFERENCESERVICE']._serialized_start=712 + _globals['_GRPCINFERENCESERVICE']._serialized_end=1178 +# @@protoc_insertion_point(module_scope) diff --git a/llm/fastdeploy_ic/proto/ic_pb2_grpc.py b/llm/fastdeploy_ic/proto/ic_pb2_grpc.py new file mode 100644 index 0000000000..fb10ae809a --- /dev/null +++ b/llm/fastdeploy_ic/proto/ic_pb2_grpc.py @@ -0,0 +1,175 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import ic_pb2 as ic__pb2 + + +class GRPCInferenceServiceStub(object): + """Inference Server GRPC endpoints. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.ModelStreamInfer = channel.unary_stream( + '/language_inference.GRPCInferenceService/ModelStreamInfer', + request_serializer=ic__pb2.ModelInferRequest.SerializeToString, + response_deserializer=ic__pb2.ModelInferResponse.FromString, + ) + self.ModelFetchRequest = channel.unary_unary( + '/language_inference.GRPCInferenceService/ModelFetchRequest', + request_serializer=ic__pb2.ModelFetchRequestParams.SerializeToString, + response_deserializer=ic__pb2.ModelFetchRequestResult.FromString, + ) + self.ModelSendResponse = channel.stream_unary( + '/language_inference.GRPCInferenceService/ModelSendResponse', + request_serializer=ic__pb2.ModelInferResponse.SerializeToString, + response_deserializer=ic__pb2.ModelSendResponseResult.FromString, + ) + self.ModelSendResponseList = channel.stream_unary( + '/language_inference.GRPCInferenceService/ModelSendResponseList', + request_serializer=ic__pb2.ModelInferResponseList.SerializeToString, + response_deserializer=ic__pb2.ModelSendResponseResult.FromString, + ) + + +class GRPCInferenceServiceServicer(object): + """Inference Server GRPC endpoints. + """ + + def ModelStreamInfer(self, request, context): + """模型推理请求入口,给上层dispatch调用 + 输入一个请求,流式返回多个response + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelFetchRequest(self, request, context): + """拉取一个请求,给inference server调用 + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelSendResponse(self, request_iterator, context): + """发送请求的返回结果,给inference server调用 + response是流式的发送 + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ModelSendResponseList(self, request_iterator, context): + """批量发送请求的返回结果,给inference server调用 + response是流式的发送 + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GRPCInferenceServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'ModelStreamInfer': grpc.unary_stream_rpc_method_handler( + servicer.ModelStreamInfer, + request_deserializer=ic__pb2.ModelInferRequest.FromString, + response_serializer=ic__pb2.ModelInferResponse.SerializeToString, + ), + 'ModelFetchRequest': grpc.unary_unary_rpc_method_handler( + servicer.ModelFetchRequest, + request_deserializer=ic__pb2.ModelFetchRequestParams.FromString, + response_serializer=ic__pb2.ModelFetchRequestResult.SerializeToString, + ), + 'ModelSendResponse': grpc.stream_unary_rpc_method_handler( + servicer.ModelSendResponse, + request_deserializer=ic__pb2.ModelInferResponse.FromString, + response_serializer=ic__pb2.ModelSendResponseResult.SerializeToString, + ), + 'ModelSendResponseList': grpc.stream_unary_rpc_method_handler( + servicer.ModelSendResponseList, + request_deserializer=ic__pb2.ModelInferResponseList.FromString, + response_serializer=ic__pb2.ModelSendResponseResult.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'language_inference.GRPCInferenceService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class GRPCInferenceService(object): + """Inference Server GRPC endpoints. + """ + + @staticmethod + def ModelStreamInfer(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/language_inference.GRPCInferenceService/ModelStreamInfer', + ic__pb2.ModelInferRequest.SerializeToString, + ic__pb2.ModelInferResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ModelFetchRequest(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/language_inference.GRPCInferenceService/ModelFetchRequest', + ic__pb2.ModelFetchRequestParams.SerializeToString, + ic__pb2.ModelFetchRequestResult.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ModelSendResponse(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary(request_iterator, target, '/language_inference.GRPCInferenceService/ModelSendResponse', + ic__pb2.ModelInferResponse.SerializeToString, + ic__pb2.ModelSendResponseResult.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ModelSendResponseList(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary(request_iterator, target, '/language_inference.GRPCInferenceService/ModelSendResponseList', + ic__pb2.ModelInferResponseList.SerializeToString, + ic__pb2.ModelSendResponseResult.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/llm/fastdeploy_ic/server/__init__.py b/llm/fastdeploy_ic/server/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/llm/fastdeploy_ic/server/api.py b/llm/fastdeploy_ic/server/api.py new file mode 100644 index 0000000000..6b5f22e921 --- /dev/null +++ b/llm/fastdeploy_ic/server/api.py @@ -0,0 +1,167 @@ +import time + +import grpc +import json +import asyncio + +import fastdeploy_ic.proto.ic_pb2_grpc as ic_pb2_grpc +import fastdeploy_ic.proto.ic_pb2 as ic_pb2 +from fastdeploy_ic.data.manager import DataManager +from fastdeploy_ic.config import GlobalConfig +from fastdeploy_ic.utils import get_logger + +logger = get_logger("ic_server", "ic_server.log") + +global_config = GlobalConfig() +redis_config = { + 'host': global_config.redis_host, + 'port': global_config.redis_port, + 'db': global_config.redis_db, + 'username': global_config.redis_username, + 'password': global_config.redis_password +} +data_manager = DataManager(redis_config) + +class GRPCInferenceServiceServicer(ic_pb2_grpc.GRPCInferenceServiceServicer): + async def ModelStreamInfer(self, request, context): + """ + Provided for request sender. + """ + try: + model_id = request.model_id + req_id = json.loads(request.input)['req_id'] + # Check whether req_id is repeated + # Warning: We only simply check whether there is any same req_id has been in, + # but we can not prevent two requests with the same req_id coming simultaneously. + # To achieve this, we should add lock to query and insert query into redis, which will influence performance. + # Currently, we assume different req_ids are confirmed by users. + if await data_manager.check_req_id_exist(model_id, req_id): + logger.info("ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) + # 1. push request to redis + await data_manager.enque_request(model_id, request) + # 2. response stream results + response_start_time = time.time() + while True: + if time.time() - response_start_time > global_config.resonpse_timeout: + if data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + await data_manager.remove_request(model_id, request) + await data_manager.clear_response(model_id, req_id) + await data_manager.remove_req_id_from_map(model_id, req_id) + logger.info("ModelStreamInfer: req_id {}: Get response from inference server timeout".format(req_id)) + await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelStreamInfer: req_id {}: Get response from inference server timeout".format(req_id)) + data = await data_manager.deque_response(model_id, req_id) + if data is None: + await asyncio.sleep(1) + continue + logger.info("ModelStreamInfer: req_id {}: response data: {}".format(req_id, data)) + yield data + try: + if json.loads(data.output)['is_end'] == 1: # this request is done + # clear resource about this req, only req_id in map should be removed + await data_manager.remove_req_id_from_map(model_id, req_id) + return + except: + if data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + await data_manager.clear_response(model_id, req_id) + await data_manager.remove_req_id_from_map(model_id, req_id) + logger.info("ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) + await context.abort(grpc.StatusCode.INTERNAL, "ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) + except Exception as e: + # if redis operation failed, should arrive here + # Log the error message, and signal users internal error (we can not expose origin redis error to users) + logger.info("ModelStreamInfer: exception: {}".format(e)) + await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") + + async def ModelFetchRequest(self, request, context): + """ + Provide for inference service. + """ + # provide two types for providing tasks + # 1. ByRequest + # 2. ByToken + try: + model_ids = request.model_id + strategy = request.strategy + requests = [] + for model_id in model_ids: + if strategy == ic_pb2.FetchStrategy.ByRequest: + requests.extend(await data_manager.get_requests_by_number(model_id, request.max_request_num)) + + else: + by_token_params = request.by_token_params + requests.extend(await data_manager.get_requests_by_block(model_id, request.max_request_num, + by_token_params.block_num, by_token_params.block_size, by_token_params.dec_token_num)) + + fetch_request_result = ic_pb2.ModelFetchRequestResult() + fetch_request_result.requests.extend(requests) + logger.info("ModelFetchRequest: return requests: {}".format(requests)) + except Exception as e: + # if operation failed, should arrive here + # Log the error message, and signal users internal error + logger.info("ModelFetchRequest: exception: {}".format(e)) + await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") + return fetch_request_result + + + async def ModelSendResponse(self, response_iterator, context): + """ + Provide for inference service. + """ + # Get response from inference server + try: + response_start_time = time.time() + async for response in response_iterator: + try: + res = json.loads(response.output) + model_id = res['ic_req_data']['model_id'] + req_id = res['req_id'] + except: + logger.info("ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) + await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) + await data_manager.enque_response(model_id, req_id, response) + logger.info("ModelSendResponse: req_id {}: response data: {}".format(req_id, res)) + if res['is_end'] == 1: + return ic_pb2.ModelSendResponseResult() + if time.time() - response_start_time > global_config.resonpse_timeout: + await data_manager.clear_response(model_id, req_id) + logger.info("ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) + await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) + except Exception as e: + # if operation failed, should arrive here + # Log the error message, and signal users internal error + logger.info("ModelSendResponse: exception: {}".format(e)) + await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") + + async def ModelSendResponseList(self, response_list_iterator, context): + """ + Provide for inference service. + """ + # Get response from inference server + try: + response_start_time = time.time() + async for response_list in response_list_iterator: + for response in response_list: + try: + res = json.loads(response.output) + model_id = res['ic_req_data']['model_id'] + req_id = res['req_id'] + except: + logger.info("ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) + await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) + await data_manager.enque_response(model_id, req_id, response) + logger.info("ModelSendResponseList: req_id {}: response data: {}".format(req_id, res)) + if res['is_end'] == 1: + break + if time.time() - response_start_time > global_config.resonpse_timeout: + await data_manager.clear_response(model_id, req_id) + logger.info("ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) + await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) + except Exception as e: + # if operation failed, should arrive here + # Log the error message, and signal users internal error + logger.info("ModelSendResponseList: exception: {}".format(e)) + await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") + return ic_pb2.ModelSendResponseResult() + + diff --git a/llm/fastdeploy_ic/server/launcher.py b/llm/fastdeploy_ic/server/launcher.py new file mode 100644 index 0000000000..302bad737d --- /dev/null +++ b/llm/fastdeploy_ic/server/launcher.py @@ -0,0 +1,72 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from concurrent import futures +import contextlib +import multiprocessing +import socket +import sys +import asyncio + +import grpc + +import fastdeploy_ic.proto.ic_pb2_grpc as ic_pb2_grpc +from .api import GRPCInferenceServiceServicer + +_PROCESS_COUNT = multiprocessing.cpu_count() +_THREAD_CONCURRENCY = _PROCESS_COUNT + + +async def _run_server(bind_address): + """Start a server in a subprocess.""" + options = (("grpc.so_reuseport", 1),) + server = grpc.aio.server(futures.ThreadPoolExecutor( + max_workers=_THREAD_CONCURRENCY, + ), + options=options) + ic_pb2_grpc.add_GRPCInferenceServiceServicer_to_server(GRPCInferenceServiceServicer(), server) + server.add_insecure_port(bind_address) + await server.start() + await server.wait_for_termination() + +def run(bind_address): + asyncio.run(_run_server(bind_address)) + + + + +@contextlib.contextmanager +def _reserve_port(port): + """Create a socket for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(("", port)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + + +def serve(args): + with _reserve_port(args.grpc_port) as port: + bind_address = "localhost:{}".format(port) + print("Binding to '%s'", bind_address) + sys.stdout.flush() + workers = [] + for _ in range(_PROCESS_COUNT): + # NOTE: It is imperative that the worker subprocesses be forked before + # any gRPC servers start up. See + # https://github.com/grpc/grpc/issues/16001 for more details. + worker = multiprocessing.Process( + target=run, args=(bind_address,) + ) + worker.start() + workers.append(worker) + for worker in workers: + worker.join() + + + \ No newline at end of file diff --git a/llm/fastdeploy_ic/utils.py b/llm/fastdeploy_ic/utils.py new file mode 100644 index 0000000000..fc90b8b111 --- /dev/null +++ b/llm/fastdeploy_ic/utils.py @@ -0,0 +1,155 @@ +import os +import contextlib +import logging +import threading +import time +from typing import (Any, Generator, Optional, Union) +from logging.handlers import TimedRotatingFileHandler + +import colorlog + +from fastdeploy_ic.config import GlobalConfig + +global_config = GlobalConfig() + +__all__ = ['get_logger'] + +_LOG_CONFIG = { + 'DEBUG': { + 'color': 'purple' + }, + 'INFO': { + 'color': 'green' + }, + 'WARNING': { + 'color': 'yellow' + }, + 'ERROR': { + 'color': 'red' + }, + 'CRITICAL': { + 'color': 'bold_red' + }, +} + +class Logger(object): + _DEFAULT_NAME: str = 'fastdeploy_ic' + + def __init__(self, + name: Optional[str]=None, + log_file=None, + time_rotation=7, + level=logging.INFO) -> None: + """Initialize the instance based on a given name. + + Args: + name: Logger name. + """ + super().__init__() + if name is None: + name = self._DEFAULT_NAME + self.logger = logging.getLogger(name) + + self.format = colorlog.ColoredFormatter( + "%(log_color)s[%(asctime)-15s] [%(levelname)8s]%(reset)s - %(message)s", + log_colors={ + key: conf['color'] + for key, conf in _LOG_CONFIG.items() + }, ) + + if log_file is not None: + self.handler = TimedRotatingFileHandler( + log_file, + when="midnight", + backupCount=time_rotation, + encoding="utf-8") + else: + self.handler = logging.StreamHandler() + self.handler.setFormatter(self.format) + + self.logger.addHandler(self.handler) + self.logger.setLevel(level) + self.logger.propagate = False + self._is_enabled = True + + def __call__(self, + log_level: int, + msg: object, + *args: object, + **kwargs: Any) -> None: + if not self.is_enabled: + return + + self.logger.log(log_level, msg, *args, **kwargs) + + def debug(self, msg: object, *args: object, **kwargs: Any) -> None: + return self(logging.getLevelName('DEBUG'), msg, *args, **kwargs) + + def info(self, msg: object, *args: object, **kwargs: Any) -> None: + return self(logging.getLevelName('INFO'), msg, *args, **kwargs) + + def warning(self, msg: object, *args: object, **kwargs: Any) -> None: + return self(logging.getLevelName('WARNING'), msg, *args, **kwargs) + + def error(self, msg: object, *args: object, **kwargs: Any) -> None: + return self(logging.getLevelName('ERROR'), msg, *args, **kwargs) + + def critical(self, msg: object, *args: object, **kwargs: Any) -> None: + return self(logging.getLevelName('CRITICAL'), msg, *args, **kwargs) + + def disable(self) -> None: + self._is_enabled = False + + def enable(self) -> None: + self._is_enabled = True + + @property + def is_enabled(self) -> bool: + return self._is_enabled + + def set_level(self, log_level: Union[int, str]) -> None: + self.logger.setLevel(log_level) + + @contextlib.contextmanager + def processing(self, msg: str, + interval: float=0.1) -> Generator[None, None, None]: + """Display a message with spinners. + + Args: + msg: Message to display. + interval: Spinning interval. + """ + end = False + + def _printer() -> None: + index = 0 + flags = ['\\', '|', '/', '-'] + while not end: + flag = flags[index % len(flags)] + with self.use_terminator('\r'): + self.info(f"{msg}: {flag}") + time.sleep(interval) + index += 1 + + t = threading.Thread(target=_printer) + t.start() + yield + end = True + + @contextlib.contextmanager + def use_terminator(self, terminator: str) -> Generator[None, None, None]: + old_terminator = self.handler.terminator + self.handler.terminator = terminator + yield + self.handler.terminator = old_terminator + +def get_logger(name, file_name): + """ + Get logger + """ + if not os.path.exists(global_config.log_dir): + os.mkdir(global_config.log_dir) + file_path = os.path.join(global_config.log_dir, file_name) + logger = Logger(name=name, log_file=file_path) + return logger + From 1d5f0e8e040641da4f97dbf9e158ec1ab685aa45 Mon Sep 17 00:00:00 2001 From: chenjian Date: Wed, 8 Nov 2023 11:28:47 +0800 Subject: [PATCH 2/9] add inference load balance controller for llm --- llm_ic/README.md | 27 +++++++++++++++ {llm => llm_ic}/fastdeploy_ic/__init__.py | 0 {llm => llm_ic}/fastdeploy_ic/config.py | 0 .../fastdeploy_ic/data/__init__.py | 0 {llm => llm_ic}/fastdeploy_ic/data/manager.py | 0 .../fastdeploy_ic/proto/__init__.py | 0 {llm => llm_ic}/fastdeploy_ic/proto/ic.proto | 0 {llm => llm_ic}/fastdeploy_ic/proto/ic_pb2.py | 0 .../fastdeploy_ic/proto/ic_pb2_grpc.py | 0 .../fastdeploy_ic/server/__init__.py | 0 {llm => llm_ic}/fastdeploy_ic/server/api.py | 0 .../fastdeploy_ic/server/launcher.py | 0 {llm => llm_ic}/fastdeploy_ic/utils.py | 0 llm_ic/main.py | 9 +++++ llm_ic/requirements.txt | 3 ++ llm_ic/setup.py | 33 +++++++++++++++++++ 16 files changed, 72 insertions(+) create mode 100644 llm_ic/README.md rename {llm => llm_ic}/fastdeploy_ic/__init__.py (100%) rename {llm => llm_ic}/fastdeploy_ic/config.py (100%) rename {llm => llm_ic}/fastdeploy_ic/data/__init__.py (100%) rename {llm => llm_ic}/fastdeploy_ic/data/manager.py (100%) rename {llm => llm_ic}/fastdeploy_ic/proto/__init__.py (100%) rename {llm => llm_ic}/fastdeploy_ic/proto/ic.proto (100%) rename {llm => llm_ic}/fastdeploy_ic/proto/ic_pb2.py (100%) rename {llm => llm_ic}/fastdeploy_ic/proto/ic_pb2_grpc.py (100%) rename {llm => llm_ic}/fastdeploy_ic/server/__init__.py (100%) rename {llm => llm_ic}/fastdeploy_ic/server/api.py (100%) rename {llm => llm_ic}/fastdeploy_ic/server/launcher.py (100%) rename {llm => llm_ic}/fastdeploy_ic/utils.py (100%) create mode 100644 llm_ic/main.py create mode 100644 llm_ic/requirements.txt create mode 100644 llm_ic/setup.py diff --git a/llm_ic/README.md b/llm_ic/README.md new file mode 100644 index 0000000000..45e4a0d91f --- /dev/null +++ b/llm_ic/README.md @@ -0,0 +1,27 @@ +# 大模型服务的负载均衡组件 + +## 环境要求 + +- python >= 3.7 +- 启动好的redis服务,用于作为负载均衡的数据库 + +## 环境变量 +目前所支持的环境变量参考fastdeploy_ic里的config.py + +| 环境变量 | 含义 | +| -------- | ------- | +| REDIS_HOST | redis服务的ip | +| REDIS_PORT | redis服务的port | +| REDIS_USERNAME | redis认证用户 | +| REDIS_PASSWORD | redis认证密码 | +| RESPONSE_TIMEOUT | 获取推理服务流式token的超时时间 | + + +## 启动示例 + +```shell +export REDIS_HOST="localhost" +export REDIS_PORT="6379" +python main.py +``` + diff --git a/llm/fastdeploy_ic/__init__.py b/llm_ic/fastdeploy_ic/__init__.py similarity index 100% rename from llm/fastdeploy_ic/__init__.py rename to llm_ic/fastdeploy_ic/__init__.py diff --git a/llm/fastdeploy_ic/config.py b/llm_ic/fastdeploy_ic/config.py similarity index 100% rename from llm/fastdeploy_ic/config.py rename to llm_ic/fastdeploy_ic/config.py diff --git a/llm/fastdeploy_ic/data/__init__.py b/llm_ic/fastdeploy_ic/data/__init__.py similarity index 100% rename from llm/fastdeploy_ic/data/__init__.py rename to llm_ic/fastdeploy_ic/data/__init__.py diff --git a/llm/fastdeploy_ic/data/manager.py b/llm_ic/fastdeploy_ic/data/manager.py similarity index 100% rename from llm/fastdeploy_ic/data/manager.py rename to llm_ic/fastdeploy_ic/data/manager.py diff --git a/llm/fastdeploy_ic/proto/__init__.py b/llm_ic/fastdeploy_ic/proto/__init__.py similarity index 100% rename from llm/fastdeploy_ic/proto/__init__.py rename to llm_ic/fastdeploy_ic/proto/__init__.py diff --git a/llm/fastdeploy_ic/proto/ic.proto b/llm_ic/fastdeploy_ic/proto/ic.proto similarity index 100% rename from llm/fastdeploy_ic/proto/ic.proto rename to llm_ic/fastdeploy_ic/proto/ic.proto diff --git a/llm/fastdeploy_ic/proto/ic_pb2.py b/llm_ic/fastdeploy_ic/proto/ic_pb2.py similarity index 100% rename from llm/fastdeploy_ic/proto/ic_pb2.py rename to llm_ic/fastdeploy_ic/proto/ic_pb2.py diff --git a/llm/fastdeploy_ic/proto/ic_pb2_grpc.py b/llm_ic/fastdeploy_ic/proto/ic_pb2_grpc.py similarity index 100% rename from llm/fastdeploy_ic/proto/ic_pb2_grpc.py rename to llm_ic/fastdeploy_ic/proto/ic_pb2_grpc.py diff --git a/llm/fastdeploy_ic/server/__init__.py b/llm_ic/fastdeploy_ic/server/__init__.py similarity index 100% rename from llm/fastdeploy_ic/server/__init__.py rename to llm_ic/fastdeploy_ic/server/__init__.py diff --git a/llm/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py similarity index 100% rename from llm/fastdeploy_ic/server/api.py rename to llm_ic/fastdeploy_ic/server/api.py diff --git a/llm/fastdeploy_ic/server/launcher.py b/llm_ic/fastdeploy_ic/server/launcher.py similarity index 100% rename from llm/fastdeploy_ic/server/launcher.py rename to llm_ic/fastdeploy_ic/server/launcher.py diff --git a/llm/fastdeploy_ic/utils.py b/llm_ic/fastdeploy_ic/utils.py similarity index 100% rename from llm/fastdeploy_ic/utils.py rename to llm_ic/fastdeploy_ic/utils.py diff --git a/llm_ic/main.py b/llm_ic/main.py new file mode 100644 index 0000000000..06889f03c7 --- /dev/null +++ b/llm_ic/main.py @@ -0,0 +1,9 @@ +import argparse + +from fastdeploy_ic.server.launcher import serve + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Inference load balance controller launcher") + parser.add_argument("--grpc-port", type=int, default=9000) + args = parser.parse_args() + serve(args) \ No newline at end of file diff --git a/llm_ic/requirements.txt b/llm_ic/requirements.txt new file mode 100644 index 0000000000..babf50a95c --- /dev/null +++ b/llm_ic/requirements.txt @@ -0,0 +1,3 @@ +aioredis +colorlog +grpcio \ No newline at end of file diff --git a/llm_ic/setup.py b/llm_ic/setup.py new file mode 100644 index 0000000000..dafda166a2 --- /dev/null +++ b/llm_ic/setup.py @@ -0,0 +1,33 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import setuptools + +setuptools.setup( + name="fastdeploy-ic", + version="0.0.9", + author="fastdeploy", + author_email="fastdeploy@baidu.com", + description="FastDeploy for Large Language Model", + long_description_content_type="text/plain", + url="https://github.com/PaddlePaddle/FastDeploy", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + ], + install_requires=["colorlog"], + extras_require={"client": ['grpcio', 'tritonclient']}, + license='Apache 2.0') From 7c93c5903422ffcebfbb1989ca84ac0e033ed481 Mon Sep 17 00:00:00 2001 From: chenjian Date: Wed, 8 Nov 2023 15:56:38 +0800 Subject: [PATCH 3/9] add ic for llm --- llm_ic/fastdeploy_ic/server/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index 6b5f22e921..9c13dca1c0 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -44,7 +44,7 @@ async def ModelStreamInfer(self, request, context): response_start_time = time.time() while True: if time.time() - response_start_time > global_config.resonpse_timeout: - if data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req await data_manager.remove_request(model_id, request) await data_manager.clear_response(model_id, req_id) await data_manager.remove_req_id_from_map(model_id, req_id) From ad0b0e546900145f4af1a794cdbe1fa2ac6967a3 Mon Sep 17 00:00:00 2001 From: chenjian Date: Wed, 8 Nov 2023 16:49:43 +0800 Subject: [PATCH 4/9] add ic for llm --- llm_ic/fastdeploy_ic/server/api.py | 2 +- llm_ic/fastdeploy_ic/server/launcher.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index 9c13dca1c0..cf1daca5ad 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -62,7 +62,7 @@ async def ModelStreamInfer(self, request, context): await data_manager.remove_req_id_from_map(model_id, req_id) return except: - if data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req await data_manager.clear_response(model_id, req_id) await data_manager.remove_req_id_from_map(model_id, req_id) logger.info("ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) diff --git a/llm_ic/fastdeploy_ic/server/launcher.py b/llm_ic/fastdeploy_ic/server/launcher.py index 302bad737d..96e8b23dbc 100644 --- a/llm_ic/fastdeploy_ic/server/launcher.py +++ b/llm_ic/fastdeploy_ic/server/launcher.py @@ -52,7 +52,7 @@ def _reserve_port(port): def serve(args): with _reserve_port(args.grpc_port) as port: - bind_address = "localhost:{}".format(port) + bind_address = "0.0.0.0:{}".format(port) print("Binding to '%s'", bind_address) sys.stdout.flush() workers = [] From 5b60c202550e8045ee1ce69d2cc160576f6605da Mon Sep 17 00:00:00 2001 From: chenjian Date: Mon, 20 Nov 2023 16:59:11 +0800 Subject: [PATCH 5/9] add fastdeploy ic for llm --- llm_ic/fastdeploy_ic/server/api.py | 37 +++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index cf1daca5ad..45afa2b083 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -29,7 +29,12 @@ async def ModelStreamInfer(self, request, context): """ try: model_id = request.model_id - req_id = json.loads(request.input)['req_id'] + input_dict = json.loads(request.input) + if 'req_id' not in input_dict: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: there is no req_id in request") + if 'ic_req_data' not in input_dict: + await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: there is no ic_req_data in request") + req_id = input_dict['req_id'] # Check whether req_id is repeated # Warning: We only simply check whether there is any same req_id has been in, # but we can not prevent two requests with the same req_id coming simultaneously. @@ -54,10 +59,18 @@ async def ModelStreamInfer(self, request, context): if data is None: await asyncio.sleep(1) continue - logger.info("ModelStreamInfer: req_id {}: response data: {}".format(req_id, data)) - yield data try: - if json.loads(data.output)['is_end'] == 1: # this request is done + output_dict = json.loads(data.output) + if time.time() - output_dict['ic_timestamp_tag'] > global_config.resonpse_timeout: # the response is invalid because of timeout, even maybe from previous request with same req_id + continue + del output_dict['ic_timestamp_tag'] + data.output = json.dumps(output_dict) + logger.info("ModelStreamInfer: req_id {}: response data: {}".format(req_id, data)) + yield data + # two cases denote the request is done + # 1. something error returned by server, but not normal result + # 2. is_end is 1 + if ('is_end' not in output_dict) or (output_dict['is_end'] == 1): # clear resource about this req, only req_id in map should be removed await data_manager.remove_req_id_from_map(model_id, req_id) return @@ -116,12 +129,18 @@ async def ModelSendResponse(self, response_iterator, context): res = json.loads(response.output) model_id = res['ic_req_data']['model_id'] req_id = res['req_id'] + # add timestamp for response + res['ic_timestamp_tag'] = time.time() # we add this to prevent that client recieves + # response for previous request due to: + # 1. use the same req_id by mistake + # 2. the client corresponding to previous request did not recieve all responses for some reason + response.output = json.dumps(res) except: logger.info("ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) await data_manager.enque_response(model_id, req_id, response) logger.info("ModelSendResponse: req_id {}: response data: {}".format(req_id, res)) - if res['is_end'] == 1: + if ('is_end' not in res) or (res['is_end'] == 1): return ic_pb2.ModelSendResponseResult() if time.time() - response_start_time > global_config.resonpse_timeout: await data_manager.clear_response(model_id, req_id) @@ -146,12 +165,18 @@ async def ModelSendResponseList(self, response_list_iterator, context): res = json.loads(response.output) model_id = res['ic_req_data']['model_id'] req_id = res['req_id'] + # add timestamp for response + res['ic_timestamp_tag'] = time.time() # we add this to prevent that client recieves + # response for previous request due to: + # 1. use the same req_id by mistake + # 2. the client corresponding to previous request did not recieve all responses for some reason + response.output = json.dumps(res) except: logger.info("ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) await data_manager.enque_response(model_id, req_id, response) logger.info("ModelSendResponseList: req_id {}: response data: {}".format(req_id, res)) - if res['is_end'] == 1: + if ('is_end' not in res) or (res['is_end'] == 1): break if time.time() - response_start_time > global_config.resonpse_timeout: await data_manager.clear_response(model_id, req_id) From 7efb041487b263eba770a479adce4c566285a089 Mon Sep 17 00:00:00 2001 From: chenjian Date: Tue, 21 Nov 2023 10:22:14 +0800 Subject: [PATCH 6/9] add fastdeploy ic to llm --- llm_ic/fastdeploy_ic/server/api.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index 45afa2b083..0da1319844 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -3,6 +3,7 @@ import grpc import json import asyncio +from aioredis import RedisError import fastdeploy_ic.proto.ic_pb2_grpc as ic_pb2_grpc import fastdeploy_ic.proto.ic_pb2 as ic_pb2 @@ -61,11 +62,12 @@ async def ModelStreamInfer(self, request, context): continue try: output_dict = json.loads(data.output) - if time.time() - output_dict['ic_timestamp_tag'] > global_config.resonpse_timeout: # the response is invalid because of timeout, even maybe from previous request with same req_id - continue - del output_dict['ic_timestamp_tag'] - data.output = json.dumps(output_dict) - logger.info("ModelStreamInfer: req_id {}: response data: {}".format(req_id, data)) + if 'ic_timestamp_tag' in output_dict: + if time.time() - output_dict['ic_timestamp_tag'] > global_config.resonpse_timeout: # the response is invalid because of timeout, even maybe from previous request with same req_id + continue + del output_dict['ic_timestamp_tag'] + data.output = json.dumps(output_dict) + logger.info("ModelStreamInfer: req_id {}: response data: {}".format(req_id, output_dict)) yield data # two cases denote the request is done # 1. something error returned by server, but not normal result @@ -74,13 +76,13 @@ async def ModelStreamInfer(self, request, context): # clear resource about this req, only req_id in map should be removed await data_manager.remove_req_id_from_map(model_id, req_id) return - except: + except Exception as e: if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req await data_manager.clear_response(model_id, req_id) await data_manager.remove_req_id_from_map(model_id, req_id) - logger.info("ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) + logger.info("ModelStreamInfer: req_id {}: Failed to read response data from inference server, exception {}".format(req_id, e)) await context.abort(grpc.StatusCode.INTERNAL, "ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) - except Exception as e: + except RedisError as e: # if redis operation failed, should arrive here # Log the error message, and signal users internal error (we can not expose origin redis error to users) logger.info("ModelStreamInfer: exception: {}".format(e)) @@ -109,7 +111,7 @@ async def ModelFetchRequest(self, request, context): fetch_request_result = ic_pb2.ModelFetchRequestResult() fetch_request_result.requests.extend(requests) logger.info("ModelFetchRequest: return requests: {}".format(requests)) - except Exception as e: + except RedisError as e: # if operation failed, should arrive here # Log the error message, and signal users internal error logger.info("ModelFetchRequest: exception: {}".format(e)) @@ -146,7 +148,7 @@ async def ModelSendResponse(self, response_iterator, context): await data_manager.clear_response(model_id, req_id) logger.info("ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) - except Exception as e: + except RedisError as e: # if operation failed, should arrive here # Log the error message, and signal users internal error logger.info("ModelSendResponse: exception: {}".format(e)) @@ -182,7 +184,7 @@ async def ModelSendResponseList(self, response_list_iterator, context): await data_manager.clear_response(model_id, req_id) logger.info("ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) - except Exception as e: + except RedisError as e: # if operation failed, should arrive here # Log the error message, and signal users internal error logger.info("ModelSendResponseList: exception: {}".format(e)) From eb08074667e91fa8bde3e16e6872358fd26de682 Mon Sep 17 00:00:00 2001 From: chenjian Date: Wed, 6 Dec 2023 19:44:22 +0800 Subject: [PATCH 7/9] Fix asyncio.CancelError exception --- llm_ic/fastdeploy_ic/config.py | 4 ++++ llm_ic/fastdeploy_ic/data/manager.py | 30 ++++++++++++++++++++++--- llm_ic/fastdeploy_ic/server/api.py | 4 +++- llm_ic/fastdeploy_ic/server/launcher.py | 4 +++- 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/llm_ic/fastdeploy_ic/config.py b/llm_ic/fastdeploy_ic/config.py index b7f75f5c31..4fcf1e308e 100644 --- a/llm_ic/fastdeploy_ic/config.py +++ b/llm_ic/fastdeploy_ic/config.py @@ -1,4 +1,5 @@ import os +import multiprocessing import json class GlobalConfig(): @@ -21,6 +22,9 @@ def __init__(self): # Response self.resonpse_timeout = int(os.getenv('RESPONSE_TIMEOUT', default="120")) + # Server + self.num_process = int(os.getenv('NUM_PROCESS', default=multiprocessing.cpu_count())) + # Logger self.log_dir = os.getenv('IC_LOG_DIR', default='ic_logs') \ No newline at end of file diff --git a/llm_ic/fastdeploy_ic/data/manager.py b/llm_ic/fastdeploy_ic/data/manager.py index d07c209fc6..921e9b0ae3 100644 --- a/llm_ic/fastdeploy_ic/data/manager.py +++ b/llm_ic/fastdeploy_ic/data/manager.py @@ -1,6 +1,7 @@ import json import math +import asyncio import aioredis @@ -9,26 +10,44 @@ logger = get_logger("data_manager", "ic_data_manager.log") +__retry_times = 5 # redis client may have unexpected errors, we retry it with respect to some errors +def retry_wrapper(f): + async def wrapper(*args, **kwargs): + for i in range(__retry_times): + try: + return await f(*args, **kwargs) + except asyncio.CancelledError: + logger.info("{} occured asyncio.CancelledError, retry times: {}".format(f.__name__, i+1)) + continue + return wrapper + + + class DataManager: def __init__(self, redis_conf) -> None: self.client = aioredis.Redis(**redis_conf) self.internal_check_key_prefix = '__keymap_' + @retry_wrapper async def check_req_id_exist(self, model_id, req_id): key = '{}{}'.format(self.internal_check_key_prefix, model_id) logger.info("check_req_id_exist: key: {} value: {}".format(key, req_id)) - await self.client.sismember(key, req_id) + is_exist = await self.client.sismember(key, req_id) + return is_exist + @retry_wrapper async def add_req_id_to_map(self, model_id, req_id): key = '{}{}'.format(self.internal_check_key_prefix, model_id) logger.info("add_req_id_to_map: key: {} value: {}".format(key, req_id)) await self.client.sadd(key, req_id) + @retry_wrapper async def remove_req_id_from_map(self, model_id, req_id): key = '{}{}'.format(self.internal_check_key_prefix, model_id) logger.info("remove_req_id_from_map: key: {} value: {}".format(key, req_id)) await self.client.srem(key, req_id) + @retry_wrapper async def enque_request(self, model_id, req, to_end=True): serialized_req = req.SerializeToString() # key = model_id @@ -37,19 +56,22 @@ async def enque_request(self, model_id, req, to_end=True): await self.client.rpush(model_id, serialized_req) else: await self.client.lpush(model_id, serialized_req) - + + @retry_wrapper async def deque_request(self, model_id): data = await self.client.lpop(model_id) if data is not None: data = ic_pb2.ModelInferRequest.FromString(data) logger.info("deque_request: key: {} value: {}".format(model_id, data)) return data - + + @retry_wrapper async def remove_request(self, model_id, req): serialized_req = req.SerializeToString() logger.info("remove_request: key: {} value: {}".format(model_id, req)) await self.client.lrem(model_id, 1, serialized_req) + @retry_wrapper async def enque_response(self, model_id, req_id, res, to_end=True): serialized_res = res.SerializeToString() key = '{}/{}'.format(model_id, req_id) @@ -59,6 +81,7 @@ async def enque_response(self, model_id, req_id, res, to_end=True): else: await self.client.lpush(key, serialized_res) + @retry_wrapper async def deque_response(self, model_id, req_id): key = '{}/{}'.format(model_id, req_id) data = await self.client.lpop(key) @@ -67,6 +90,7 @@ async def deque_response(self, model_id, req_id): logger.info("deque_response: key: {} value: {}".format(key, data)) return data + @retry_wrapper async def clear_response(self, model_id, req_id): key = '{}/{}'.format(model_id, req_id) logger.info("clear_response: key: {}".format(key)) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index 0da1319844..ab0d48ae24 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -45,6 +45,7 @@ async def ModelStreamInfer(self, request, context): logger.info("ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) # 1. push request to redis + await data_manager.add_req_id_to_map(model_id, req_id) await data_manager.enque_request(model_id, request) # 2. response stream results response_start_time = time.time() @@ -76,6 +77,7 @@ async def ModelStreamInfer(self, request, context): # clear resource about this req, only req_id in map should be removed await data_manager.remove_req_id_from_map(model_id, req_id) return + except Exception as e: if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req await data_manager.clear_response(model_id, req_id) @@ -162,7 +164,7 @@ async def ModelSendResponseList(self, response_list_iterator, context): try: response_start_time = time.time() async for response_list in response_list_iterator: - for response in response_list: + for response in response_list.response_list: try: res = json.loads(response.output) model_id = res['ic_req_data']['model_id'] diff --git a/llm_ic/fastdeploy_ic/server/launcher.py b/llm_ic/fastdeploy_ic/server/launcher.py index 96e8b23dbc..16ee3998f2 100644 --- a/llm_ic/fastdeploy_ic/server/launcher.py +++ b/llm_ic/fastdeploy_ic/server/launcher.py @@ -12,9 +12,11 @@ import grpc import fastdeploy_ic.proto.ic_pb2_grpc as ic_pb2_grpc +from fastdeploy_ic.config import GlobalConfig from .api import GRPCInferenceServiceServicer -_PROCESS_COUNT = multiprocessing.cpu_count() +global_config = GlobalConfig() +_PROCESS_COUNT = global_config.num_process _THREAD_CONCURRENCY = _PROCESS_COUNT From 129537c7eb37ea0a9759d561b40a9887c3752887 Mon Sep 17 00:00:00 2001 From: chenjian Date: Mon, 18 Dec 2023 17:24:44 +0800 Subject: [PATCH 8/9] do not block ic by same req_id --- llm_ic/fastdeploy_ic/server/api.py | 7 ++++--- llm_ic/requirements.txt | 3 ++- llm_ic/setup.py | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index ab0d48ae24..86ef27e0ea 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -41,12 +41,13 @@ async def ModelStreamInfer(self, request, context): # but we can not prevent two requests with the same req_id coming simultaneously. # To achieve this, we should add lock to query and insert query into redis, which will influence performance. # Currently, we assume different req_ids are confirmed by users. - if await data_manager.check_req_id_exist(model_id, req_id): - logger.info("ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) - await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) + # if await data_manager.check_req_id_exist(model_id, req_id): + # logger.info("ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) + # await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) # 1. push request to redis await data_manager.add_req_id_to_map(model_id, req_id) await data_manager.enque_request(model_id, request) + logger.info("ModelStreamInfer: req_id {}: enqued request".format(req_id)) # 2. response stream results response_start_time = time.time() while True: diff --git a/llm_ic/requirements.txt b/llm_ic/requirements.txt index babf50a95c..d809ebeac7 100644 --- a/llm_ic/requirements.txt +++ b/llm_ic/requirements.txt @@ -1,3 +1,4 @@ aioredis colorlog -grpcio \ No newline at end of file +grpcio +protobuf \ No newline at end of file diff --git a/llm_ic/setup.py b/llm_ic/setup.py index dafda166a2..0b9b0ba0b5 100644 --- a/llm_ic/setup.py +++ b/llm_ic/setup.py @@ -15,7 +15,7 @@ import setuptools setuptools.setup( - name="fastdeploy-ic", + name="fastdeploy_ic", version="0.0.9", author="fastdeploy", author_email="fastdeploy@baidu.com", @@ -28,6 +28,6 @@ "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", ], - install_requires=["colorlog"], + install_requires=["colorlog", "aioredis", "grpcio", "protobuf"], extras_require={"client": ['grpcio', 'tritonclient']}, license='Apache 2.0') From 2b9b3f0d4d5f95b4e9b4c9bd4cae4886365a3751 Mon Sep 17 00:00:00 2001 From: chenjian Date: Tue, 19 Dec 2023 14:39:45 +0800 Subject: [PATCH 9/9] update stability for redis client --- llm_ic/fastdeploy_ic/data/manager.py | 12 ++++++++ llm_ic/fastdeploy_ic/server/api.py | 44 ++++++++++++++++------------ 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/llm_ic/fastdeploy_ic/data/manager.py b/llm_ic/fastdeploy_ic/data/manager.py index 921e9b0ae3..002e5fdf72 100644 --- a/llm_ic/fastdeploy_ic/data/manager.py +++ b/llm_ic/fastdeploy_ic/data/manager.py @@ -19,15 +19,27 @@ async def wrapper(*args, **kwargs): except asyncio.CancelledError: logger.info("{} occured asyncio.CancelledError, retry times: {}".format(f.__name__, i+1)) continue + except aioredis.ConnectionError: + args[0].renew_client() + logger.info("{} occured aioredis.ConnectionError, retry times: {}".format(f.__name__, i+1)) + continue + except aioredis.TimeoutError: + args[0].renew_client() + logger.info("{} occured aioredis.TimeoutError, retry times: {}".format(f.__name__, i+1)) + continue return wrapper class DataManager: def __init__(self, redis_conf) -> None: + self.redis_conf = redis_conf self.client = aioredis.Redis(**redis_conf) self.internal_check_key_prefix = '__keymap_' + def renew_client(self): + self.client = aioredis.Redis(**self.redis_conf) + @retry_wrapper async def check_req_id_exist(self, model_id, req_id): key = '{}{}'.format(self.internal_check_key_prefix, model_id) diff --git a/llm_ic/fastdeploy_ic/server/api.py b/llm_ic/fastdeploy_ic/server/api.py index 86ef27e0ea..b165d6ec13 100644 --- a/llm_ic/fastdeploy_ic/server/api.py +++ b/llm_ic/fastdeploy_ic/server/api.py @@ -21,9 +21,11 @@ 'username': global_config.redis_username, 'password': global_config.redis_password } -data_manager = DataManager(redis_config) class GRPCInferenceServiceServicer(ic_pb2_grpc.GRPCInferenceServiceServicer): + def __init__(self): + self.data_manager = DataManager(redis_config) + async def ModelStreamInfer(self, request, context): """ Provided for request sender. @@ -41,24 +43,24 @@ async def ModelStreamInfer(self, request, context): # but we can not prevent two requests with the same req_id coming simultaneously. # To achieve this, we should add lock to query and insert query into redis, which will influence performance. # Currently, we assume different req_ids are confirmed by users. - # if await data_manager.check_req_id_exist(model_id, req_id): + # if await self.data_manager.check_req_id_exist(model_id, req_id): # logger.info("ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) # await context.abort(grpc.StatusCode.INVALID_ARGUMENT, "ModelStreamInfer: req_id {}: has existed in other task".format(req_id)) # 1. push request to redis - await data_manager.add_req_id_to_map(model_id, req_id) - await data_manager.enque_request(model_id, request) + await self.data_manager.add_req_id_to_map(model_id, req_id) + await self.data_manager.enque_request(model_id, request) logger.info("ModelStreamInfer: req_id {}: enqued request".format(req_id)) # 2. response stream results response_start_time = time.time() while True: if time.time() - response_start_time > global_config.resonpse_timeout: - if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req - await data_manager.remove_request(model_id, request) - await data_manager.clear_response(model_id, req_id) - await data_manager.remove_req_id_from_map(model_id, req_id) + if await self.data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + await self.data_manager.remove_request(model_id, request) + await self.data_manager.clear_response(model_id, req_id) + await self.data_manager.remove_req_id_from_map(model_id, req_id) logger.info("ModelStreamInfer: req_id {}: Get response from inference server timeout".format(req_id)) await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelStreamInfer: req_id {}: Get response from inference server timeout".format(req_id)) - data = await data_manager.deque_response(model_id, req_id) + data = await self.data_manager.deque_response(model_id, req_id) if data is None: await asyncio.sleep(1) continue @@ -76,13 +78,13 @@ async def ModelStreamInfer(self, request, context): # 2. is_end is 1 if ('is_end' not in output_dict) or (output_dict['is_end'] == 1): # clear resource about this req, only req_id in map should be removed - await data_manager.remove_req_id_from_map(model_id, req_id) + await self.data_manager.remove_req_id_from_map(model_id, req_id) return except Exception as e: - if await data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req - await data_manager.clear_response(model_id, req_id) - await data_manager.remove_req_id_from_map(model_id, req_id) + if await self.data_manager.check_req_id_exist(model_id, req_id): # clear resource about this req + await self.data_manager.clear_response(model_id, req_id) + await self.data_manager.remove_req_id_from_map(model_id, req_id) logger.info("ModelStreamInfer: req_id {}: Failed to read response data from inference server, exception {}".format(req_id, e)) await context.abort(grpc.StatusCode.INTERNAL, "ModelStreamInfer: req_id {}: Failed to read response data from inference server".format(req_id)) except RedisError as e: @@ -90,6 +92,10 @@ async def ModelStreamInfer(self, request, context): # Log the error message, and signal users internal error (we can not expose origin redis error to users) logger.info("ModelStreamInfer: exception: {}".format(e)) await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") + + except Exception as e: + logger.info("ModelStreamInfer: exception: type {}: {}".format(type(e), e)) + await context.abort(grpc.StatusCode.INTERNAL, "Internal error happened") async def ModelFetchRequest(self, request, context): """ @@ -104,11 +110,11 @@ async def ModelFetchRequest(self, request, context): requests = [] for model_id in model_ids: if strategy == ic_pb2.FetchStrategy.ByRequest: - requests.extend(await data_manager.get_requests_by_number(model_id, request.max_request_num)) + requests.extend(await self.data_manager.get_requests_by_number(model_id, request.max_request_num)) else: by_token_params = request.by_token_params - requests.extend(await data_manager.get_requests_by_block(model_id, request.max_request_num, + requests.extend(await self.data_manager.get_requests_by_block(model_id, request.max_request_num, by_token_params.block_num, by_token_params.block_size, by_token_params.dec_token_num)) fetch_request_result = ic_pb2.ModelFetchRequestResult() @@ -143,12 +149,12 @@ async def ModelSendResponse(self, response_iterator, context): except: logger.info("ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponse: req_id {}: Failed to read response data from inference server".format(req_id)) - await data_manager.enque_response(model_id, req_id, response) + await self.data_manager.enque_response(model_id, req_id, response) logger.info("ModelSendResponse: req_id {}: response data: {}".format(req_id, res)) if ('is_end' not in res) or (res['is_end'] == 1): return ic_pb2.ModelSendResponseResult() if time.time() - response_start_time > global_config.resonpse_timeout: - await data_manager.clear_response(model_id, req_id) + await self.data_manager.clear_response(model_id, req_id) logger.info("ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponse: req_id {}: Get response from inference server timeout".format(req_id)) except RedisError as e: @@ -179,12 +185,12 @@ async def ModelSendResponseList(self, response_list_iterator, context): except: logger.info("ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) await context.abort(grpc.StatusCode.INTERNAL, "ModelSendResponseList: req_id {}: Failed to read response data from inference server".format(req_id)) - await data_manager.enque_response(model_id, req_id, response) + await self.data_manager.enque_response(model_id, req_id, response) logger.info("ModelSendResponseList: req_id {}: response data: {}".format(req_id, res)) if ('is_end' not in res) or (res['is_end'] == 1): break if time.time() - response_start_time > global_config.resonpse_timeout: - await data_manager.clear_response(model_id, req_id) + await self.data_manager.clear_response(model_id, req_id) logger.info("ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) await context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, "ModelSendResponseList: req_id {}: Get response from inference server timeout".format(req_id)) except RedisError as e: