Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] api 流式批处理bug 只能得到一个结果,然后卡死。不得不超时强制取消 #3144

Open
3 tasks done
xiezhipeng-git opened this issue Feb 15, 2025 · 11 comments
Assignees

Comments

@xiezhipeng-git
Copy link

xiezhipeng-git commented Feb 15, 2025

Checklist

  • 1. I have searched related issues but cannot get the expected help.
  • 2. The bug has not been fixed in the latest version.
  • 3. Please note that if the bug-related issue you submitted lacks corresponding environment info and a minimal reproducible demo, it will be challenging for us to reproduce and resolve the issue, reducing the likelihood of receiving feedback.

Describe the bug

lmdeploy 命令

python -m lmdeploy serve api_server "/mnt/d/Users/Admin/.cache//kagglehub/models/shelterw/deepseek-r1/transformers/deepseek-r1-distill-qwen-14b-awq/1" --server-name="127.0.0.1" --server-port=45001 --model-name="1" --quant-policy=8 --session-len=32768 --tp=1 --cache-max-entry-count=0.9  --backend=pytorch 

vllm命令

python -m vllm.entrypoints.openai.api_server --model="/mnt/d/Users/Admin/.cache//kagglehub/models/shelterw/deepseek-r1/transformers/deepseek-r1-distill-qwen-14b-awq/1" --served-model-name="1" --trust-remote-code --host=127.0.0.1 --port=45001 --tensor-parallel-size=1 --gpu-memory-utilization=0.9 --max-num-seqs=256 --enforce-eager --max-model-len=16384

分别 使用vllm 和lmdeploy启动openai api 服务。接着使用流式并行推理。vllm功能正常。lmdeploy只有第一个正常结束,然后程序卡住,当取消以后得到第一个的结果,其余所有的都没有开始

Reproduction

下列代码不一定能运行。我的代码内容有些复杂,让ai写了个简单的。目的都是处理流式并行 推理

import openai
from typing import List

def stream_batch_inference(prompts: List[str], api_key: str, model_name: str = "1") -> List[str]:
    """
    标准流式批处理请求示例
    包含基础错误处理和超时机制
    """
    client = openai.OpenAI(api_key=api_key)
    results = ["" for _ in prompts]
    
    try:
        # 创建流式请求
        response = client.chat.completions.create(
            model=model_name,
            messages=[[{"role": "user", "content": p}] for p in prompts],
            stream=True,
            timeout=30  # 整个请求超时时间
        )

        # 处理流式响应
        for chunk in response:
            if not chunk.choices:
                continue
                
            choice = chunk.choices
            if choice.delta and choice.delta.content:
                results[choice.index] += choice.delta.content

    except openai.APITimeoutError as e:
        print(f"API请求超时: {e}")
        return [f"Error: Timeout - {str(e)}" for _ in prompts]
        
    except openai.APIError as e:
        print(f"API错误: {e}")
        return [f"Error: API Error - {str(e)}" for _ in prompts]

    return results

# 使用示例
if __name__ == "__main__":
    api_key = "your-api-key"  # 替换为真实API密钥
    test_prompts = [
        "解释量子计算的基本原理",
        "用Python写个hello world程序",
        "法国的首都是哪里?"
    ]
    
    responses = stream_batch_inference(test_prompts, api_key)
    for i, (prompt, response) in enumerate(zip(test_prompts, responses)):
        print(f"Prompt {i+1}: {prompt}")
        print(f"Response: {response}\n{'-'*40}")

Environment

lmdeploy check_env
sys.platform: linux
Python: 3.12.7 | packaged by Anaconda, Inc. | (main, Oct  4 2024, 13:27:36) [GCC 11.2.0]
CUDA available: True
MUSA available: False
numpy_random_seed: 2147483648
GPU 0: NVIDIA GeForce RTX 4090
CUDA_HOME: /usr/local/cuda-12.6
NVCC: Cuda compilation tools, release 12.6, V12.6.77
GCC: gcc (Ubuntu 13.3.0-6ubuntu2~24.04) 13.3.0
PyTorch: 2.5.1+cu124
PyTorch compiling details: PyTorch built with:
  - GCC 9.3
  - C++ Version: 201703
  - Intel(R) oneAPI Math Kernel Library Version 2023.1-Product Build 20230303 for Intel(R) 64 architecture applications
  - Intel(R) MKL-DNN v3.5.3 (Git Hash 66f0cb9eb66affd2da3bf5f8d897376f04aae6af)
  - OpenMP 201511 (a.k.a. OpenMP 4.5)
  - LAPACK is enabled (usually provided by MKL)
  - NNPACK is enabled
  - CPU capability usage: AVX2
  - CUDA Runtime 12.4
  - NVCC architecture flags: -gencode;arch=compute_50,code=sm_50;-gencode;arch=compute_60,code=sm_60;-gencode;arch=compute_70,code=sm_70;-gencode;arch=compute_75,code=sm_75;-gencode;arch=compute_80,code=sm_80;-gencode;arch=compute_86,code=sm_86;-gencode;arch=compute_90,code=sm_90
  - CuDNN 90.1
  - Magma 2.6.1
  - Build settings: BLAS_INFO=mkl, BUILD_TYPE=Release, CUDA_VERSION=12.4, CUDNN_VERSION=9.1.0, CXX_COMPILER=/opt/rh/devtoolset-9/root/usr/bin/c++, CXX_FLAGS= -D_GLIBCXX_USE_CXX11_ABI=0 -fabi-version=11 -fvisibility-inlines-hidden -DUSE_PTHREADPOOL -DNDEBUG -DUSE_KINETO -DLIBKINETO_NOROCTRACER -DLIBKINETO_NOXPUPTI=ON -DUSE_FBGEMM -DUSE_PYTORCH_QNNPACK -DUSE_XNNPACK -DSYMBOLICATE_MOBILE_DEBUG_HANDLE -O2 -fPIC -Wall -Wextra -Werror=return-type -Werror=non-virtual-dtor -Werror=bool-operation -Wnarrowing -Wno-missing-field-initializers -Wno-type-limits -Wno-array-bounds -Wno-unknown-pragmas -Wno-unused-parameter -Wno-strict-overflow -Wno-strict-aliasing -Wno-stringop-overflow -Wsuggest-override -Wno-psabi -Wno-error=old-style-cast -Wno-missing-braces -fdiagnostics-color=always -faligned-new -Wno-unused-but-set-variable -Wno-maybe-uninitialized -fno-math-errno -fno-trapping-math -Werror=format -Wno-stringop-overflow, LAPACK_INFO=mkl, PERF_WITH_AVX=1, PERF_WITH_AVX2=1, TORCH_VERSION=2.5.1, USE_CUDA=ON, USE_CUDNN=ON, USE_CUSPARSELT=1, USE_EXCEPTION_PTR=1, USE_GFLAGS=OFF, USE_GLOG=OFF, USE_GLOO=ON, USE_MKL=ON, USE_MKLDNN=ON, USE_MPI=OFF, USE_NCCL=1, USE_NNPACK=ON, USE_OPENMP=ON, USE_ROCM=OFF, USE_ROCM_KERNEL_ASSERT=OFF, 

TorchVision: 0.20.1+cu124
LMDeploy: 0.7.0.post3+c7581f6
transformers: 4.48.2
gradio: 5.13.1
fastapi: 0.115.4
pydantic: 2.8.2
triton: 3.1.0
NVIDIA Topology: 
        GPU0    CPU Affinity    NUMA Affinity   GPU NUMA ID
GPU0     X                              N/A

Legend:

  X    = Self
  SYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
  NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
  PHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
  PXB  = Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge)
  PIX  = Connection traversing at most a single PCIe bridge
  NV#  = Connection traversing a bonded set of # NVLinks

Error traceback

@xiezhipeng-git
Copy link
Author

@lvhan028 api 流式批处理bug 只能得到一个结果,然后卡死。不得不超时强制取消,从结果看是第2个数据都没有开始处理

@grimoire
Copy link
Collaborator

可以试试看这个

import openai
from typing import List
import asyncio


async def chat_single(client: openai.AsyncOpenAI, prompt: str, model_name: str):
    """chat single async"""
    response = await client.chat.completions.create(
            model=model_name,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            timeout=30
        )

    output = ''
    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta:
            output += chunk.choices[0].delta.content
    return output


async def chat_all(client: openai.AsyncOpenAI, prompts: List[str], model_name: str):
    """chat async"""
    return await asyncio.gather(*[chat_single(client, p, model_name) for p in prompts])
    

def stream_batch_inference(prompts: List[str], api_key: str, model_name: str = "1") -> List[str]:
    """
    标准流式批处理请求示例
    包含基础错误处理和超时机制
    """

    client = openai.AsyncOpenAI(api_key=api_key, base_url='http://0.0.0.0:23335/v1')
    results = ["" for _ in prompts]
    
    try:
        # 创建流式请求
        results = asyncio.run(chat_all(client, prompts, model_name))
    except openai.APITimeoutError as e:
        print(f"API请求超时: {e}")
        return [f"Error: Timeout - {str(e)}" for _ in prompts]
        
    except openai.APIError as e:
        print(f"API错误: {e}")
        return [f"Error: API Error - {str(e)}" for _ in prompts]

    return results


# 使用示例
if __name__ == "__main__":
    api_key = "your-api-key"  # 替换为真实API密钥
    test_prompts = [
        "解释量子计算的基本原理",
        "用Python写个hello world程序",
        "法国的首都是哪里?"
    ]
    
    responses = stream_batch_inference(test_prompts, api_key)
    for i, (prompt, response) in enumerate(zip(test_prompts, responses)):
        print(f"Prompt {i+1}: {prompt}")
        print(f"Response: {response}\n{'-'*40}")

@grimoire
Copy link
Collaborator

还有, distill 的 r1 turbomind 应该都支持的,awq 的性能也是 turbomind 更好,如果追求性能更推荐用 turbomind。

@xiezhipeng-git
Copy link
Author

xiezhipeng-git commented Feb 18, 2025

还有, distill 的 r1 turbomind 应该都支持的,awq 的性能也是 turbomind 更好,如果追求性能更推荐用 turbomind。

turbomind 有在用。 性能确实比最新版vllm快了接近70%。非常nice
另外有直接输入对话列表的方式吗?简单修改下messages=[{"role": "user", "content": prompt}], 这一句 换成完整的有系统提示词的对话消息messages 是不是就可以?
因为如果使用api 本地并不知道使用了什么对话模板。本地不能获得服务器模型的tokenizer。
并且openai api 也改了。独立出来一个batch推理的形式了。这种方式性能一样吗?

在ipynb里加上以后可以了。

import nest_asyncio
nest_asyncio.apply()

@grimoire

@grimoire
Copy link
Collaborator

还有, distill 的 r1 turbomind 应该都支持的,awq 的性能也是 turbomind 更好,如果追求性能更推荐用 turbomind。

turbomind 有在用。 性能确实比最新版vllm快了接近70%。非常nice 另外有直接输入对话列表的方式吗?简单修改下messages=[{"role": "user", "content": prompt}], 这一句 换成完整的有系统提示词的对话消息messages 是不是就可以? 因为如果使用api 本地并不知道使用了什么对话模板。本地不能获得服务器模型的tokenizer。 并且openai api 也改了。独立出来一个batch推理的形式了。这种方式性能一样吗?

在ipynb里加上以后可以了。

import nest_asyncio
nest_asyncio.apply()
@grimoire

@AllentDan

@AllentDan
Copy link
Collaborator

Hi, @xiezhipeng-git 对齐一些问题哈

  1. 直接输入对话列表是指 list of dict 还是 list of list of dict? 前者表示一个完整的对话,后者表示多个完整的对话。目前都是只支持前者。服务器是并发处理请求的,客户端可以多线程或者协程发请求。这里list of dict 示例如下:
    messages = [{
        'role': 'system',
        'content': 'you are a helpful assistant'
    }, {
        'role': 'user',
        'content': 'who are you'
    }, {
        'role': 'assistant',
        'content': 'I am an AI'
    }, {
        'role': 'user',
        'content': 'AGI is?'
    }]
  1. 没太懂。不过 /v1/completions 接口可以传如一个 str 类型,这个 str 可以是一个已经客户端拼接好的对话。同时这个接口接受 list of str 格式的请求,即多个完整对话。性能差不多

@xiezhipeng-git
Copy link
Author

Hi, @xiezhipeng-git 对齐一些问题哈

  1. 直接输入对话列表是指 list of dict 还是 list of list of dict? 前者表示一个完整的对话,后者表示多个完整的对话。目前都是只支持前者。服务器是并发处理请求的,客户端可以多线程或者协程发请求。这里list of dict 示例如下:

    messages = [{
    'role': 'system',
    'content': 'you are a helpful assistant'
    }, {
    'role': 'user',
    'content': 'who are you'
    }, {
    'role': 'assistant',
    'content': 'I am an AI'
    }, {
    'role': 'user',
    'content': 'AGI is?'
    }]

  2. 没太懂。不过 /v1/completions 接口可以传如一个 str 类型,这个 str 可以是一个已经客户端拼接好的对话。同时这个接口接受 list of str 格式的请求,即多个完整对话。性能差不多

  1. 是这种。即直接输入对话列表消息。后来我试了一下,可以。就是直接输入即可
    2.就是想了解下这种方式和另一种opneai 官方建议的batch 处理会不会有区别。

问题应该是都解决了。可以关闭了

@xiezhipeng-git
Copy link
Author

xiezhipeng-git commented Feb 21, 2025

@AllentDan @grimoire
好像不行啊。有很大概率推理的时候会报错误。并且该错误没有返回给客户端

response = await client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        timeout=30
    )

就是推理的时候有可能报错。并且使用try except Exception as e: 得到的异常为""
这时候查看服务器后台有下列错误

2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884f8770
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884fad50
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884f9790
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884fa390
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884f8050
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed884fbb60
2025-02-21 22:05:50,734 - lmdeploy - ERROR - async_engine.py:579 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7fed88afb830
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed88539940>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed885397c0>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed88539b80>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed8853b110>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed8853aa50>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed8853a030>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7fed88538710>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 382, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state

可以拿这个代码进行测试。感觉问题很多。可以在ifmain里分别尝试

    client = AsyncOpenAI(
        api_key="sk-xxx",
        base_url="http://localhost:45001/v1"
    )
    client = openai.OpenAI(
        api_key="sk-xxx",
        base_url="http://localhost:45001/v1"
    )
# TODO:待尝试使用https://github.com/pyper-dev/pyper
import openai
import asyncio
# ipynb 里需要增加下列两行。py 里不能加
# import nest_asyncio
# nest_asyncio.apply()
import time
import re
import copy
from typing import List, Optional, Union, Tuple, Deque
from openai import AsyncOpenAI, OpenAI, BadRequestError
from typing import List, Deque, Tuple, Union
from collections import deque

class GenerationResult:
    def __init__(self, text: str, finish_reason: Optional[str], stop_reason: Optional[str]):
        self.text = text
        self.finish_reason = finish_reason
        self.stop_reason = stop_reason

class GenerationArgs:
    def __init__(self, 
                 temperature: float = 1.0,
                 top_p: float = 1.0,
                 max_tokens: int = 20000,
                 stop_words: Optional[List[str]] = None,
                 repetition_penalty: float = 1.0):
        self.temperature = temperature
        self.top_p = top_p
        self.max_tokens = max_tokens
        self.stop_words = stop_words or []
        self.repetition_penalty = repetition_penalty

async def generate_single(
    client: Union[OpenAI, AsyncOpenAI],
    served_model_name: str,
    request_data: Union[str, List[dict]],
    is_chat: bool,
    params: GenerationArgs,
    timeout: float,
    retry_remaining: int,
    start_time: Optional[float] = None,
) -> GenerationResult:
    if start_time is None:
        start_time = time.time()
    full_text = ""
    finish_reason = None
    stop_reason = None
    response = None
    
    while retry_remaining >= 0:
        try:
            current_params = copy.deepcopy(params)
            if current_params.temperature == 0:
                current_params.temperature = None

            # 根据客户端类型决定是否使用流式
            is_async = isinstance(client, AsyncOpenAI)
            stream = is_async  # 异步客户端用流式,同步用非流式
            if is_chat and stream:
                response = await client.chat.completions.create(
                    model=served_model_name,
                    messages=request_data,
                    stream=stream,
                    timeout=min(timeout, 3),
                    temperature=current_params.temperature,
                    top_p=current_params.top_p,
                    max_tokens=current_params.max_tokens,
                    stop=current_params.stop_words,
                    frequency_penalty=current_params.repetition_penalty,
                )
            elif (not is_chat) and stream:
                response = await client.completions.create(
                    model=served_model_name,
                    prompt=request_data,
                    stream=stream,
                    timeout=min(timeout, 3),
                    temperature=current_params.temperature,
                    top_p=current_params.top_p,
                    max_tokens=current_params.max_tokens,
                    stop=current_params.stop_words,
                    frequency_penalty=current_params.repetition_penalty,
                )
            elif is_chat and (not stream):
                response = client.chat.completions.create(
                    model=served_model_name,
                    messages=request_data,
                    stream=stream,
                    timeout=min(timeout, 3),
                    temperature=current_params.temperature,
                    top_p=current_params.top_p,
                    max_tokens=current_params.max_tokens,
                    stop=current_params.stop_words,
                    frequency_penalty=current_params.repetition_penalty,
                )
            else:
                response = client.completions.create(
                    model=served_model_name,
                    prompt=request_data,
                    stream=stream,
                    timeout=min(timeout, 3),
                    temperature=current_params.temperature,
                    top_p=current_params.top_p,
                    max_tokens=current_params.max_tokens,
                    stop=current_params.stop_words,
                    frequency_penalty=current_params.repetition_penalty,
                )

            if stream:  # 流式处理分支
                async for chunk in response:
                    try:
                        if is_chat:
                            content = chunk.choices[0].delta.content or ""
                        else:
                            content = chunk.choices[0].text or ""
                    except AttributeError:
                        content = ""
                    
                    full_text += content
                    
                    if chunk.choices[0].finish_reason:
                        finish_reason = chunk.choices[0].finish_reason
                        stop_reason = getattr(chunk.choices[0], 'stop_reason', None)
            else:  # 非流式处理分支
                if is_chat:
                    full_text = response.choices[0].message.content
                else:
                    full_text = response.choices[0].text
                
                finish_reason = response.choices[0].finish_reason
                stop_reason = getattr(response.choices[0], 'stop_reason', None)

            return GenerationResult(full_text, finish_reason, stop_reason)

        except BadRequestError as e:
            error_msg = str(e)
            if "maximum context length" in error_msg and retry_remaining > 0:
                match = re.search(r"(\d+) tokens.*(\d+) tokens", error_msg)
                if match:
                    max_context = int(match.group(1))
                    used_tokens = int(match.group(2))
                    new_max_tokens = max_context - used_tokens
                    params.max_tokens = max(1, new_max_tokens)
                    retry_remaining -= 1
                    continue
            return GenerationResult(full_text, "error", "BadRequestError"+error_msg)
        except Exception as e:
            
            return GenerationResult(full_text, "error", "Exception"+str(e))
        finally:
            if response is not None:
                try:
                    if hasattr(response, 'close'):
                        await response.close()
                except Exception as close_error:
                    print(f"Failed to close response: {close_error}")
                finally:
                    response = None
    return GenerationResult(full_text, "error", "max_retries_exceeded")

async def batch_generate_async(
    client: Union[OpenAI, AsyncOpenAI],
    served_model_name: str,
    messages: Optional[List[List[dict]]] = None,
    prompts: Optional[List[str]] = None,
    is_chat_message: bool = True,
    sampling_params: Optional[GenerationArgs] = None,
    task_timeout: int = 1800,
    retry_times: int = 3,
    start_time: Optional[float] = None
) -> List[GenerationResult]:
    # 参数校验
    if is_chat_message:
        if messages is None:
            raise ValueError("messages 参数不能为空")
        requests = messages
    else:
        if prompts is None:
            raise ValueError("prompts 参数不能为空")
        requests = prompts
    
    if len(requests) == 0:
        return []
    sampling_params = sampling_params or GenerationArgs()
    tasks = []
    
    for idx in range(len(requests)):
        request_data = requests[idx]
        task = generate_single(
            client=client,
            served_model_name=served_model_name,
            request_data=request_data,
            is_chat=is_chat_message,
            params=sampling_params,
            timeout=task_timeout,
            retry_remaining=retry_times,
            start_time=start_time
        )
        tasks.append(task)
    
    return await asyncio.gather(*tasks)

def batch_generate(
    client: Union[OpenAI, AsyncOpenAI],
    served_model_name: str,
    messages: Optional[List[List[dict]]] = None,
    prompts: Optional[List[str]] = None,
    is_chat_message: bool = True,
    sampling_params: Optional[GenerationArgs] = None,
    task_timeout: int = 1800,
    retry_times: int = 3,
    start_time = None,
) -> List[GenerationResult]:
    async def _async_wrapper(client):
        # if isinstance(client, AsyncOpenAI):
        #     async_client = client
        # else:
        #     # 如果是同步客户端,创建新的异步客户端(复用 API 配置)
        #     async_client = AsyncOpenAI(
        #         api_key=client.api_key,
        #         base_url=client.base_url,
        #         # 可继续传递其他参数(如 timeout 等)
        #     ) 
        return await batch_generate_async(
            client=client,
            served_model_name=served_model_name,
            messages=messages,
            prompts=prompts,
            is_chat_message=is_chat_message,
            sampling_params=sampling_params,
            task_timeout=task_timeout,
            retry_times=retry_times,
            start_time=start_time,
        )

    return asyncio.run(_async_wrapper(client))


async def parallel_generate_async(
    clients:  Union[List[AsyncOpenAI], List[OpenAI]],
    served_model_names: List[str],
    messages_list: Optional[List[List[dict]]] = None,
    prompts_list: Optional[List[str]] = None,
    is_chat_message: bool = True,
    sampling_params: Union[GenerationArgs, List[GenerationArgs]] = None,
    task_timeout: int = 1800,
    retry_times: int = 1,  # 连续错误次数阈值
    start_time: Optional[float] = None,
    max_batchs: Optional[List[int]] = None,  # 每个客户端的最大并行数

) -> List[GenerationResult]:
    """
    动态容错版本,核心特性:
    1. 仅需max_batchs一个参数
    2. 自动根据并行能力和剩余任务动态分配
    3. 连续错误自动熔断
    """
    if not start_time:
        start_time = time.time()
    if max_batchs is None:
        max_batchs = [None] * len(clients)
    # 输入校验
    assert len(clients) == len(served_model_names) == len(max_batchs), f"参数长度不一致 {len(clients)} {len(served_model_names)} {len(max_batchs)}"
    
    # 准备全局任务队列
    input_data = messages_list if is_chat_message else prompts_list
    assert input_data is not None, "必须提供messages_list或prompts_list"
    task_queue = deque(enumerate(input_data))
    total_tasks = len(input_data)
    results = [None] * total_tasks
    queue_lock = asyncio.Lock()
    # 计算已明确分配的任务总量
    allocated_tasks = sum(m for m in max_batchs if m is not None)

    # 计算剩余需要分配的任务(确保非负)
    remaining_tasks = max(total_tasks - allocated_tasks, 0)

    # 统计需要动态分配的客户端数量
    dynamic_clients = sum(1 for m in max_batchs if m is None)

    # 计算动态分配节点的默认配额(向上取整)
    default_one_times_max = (remaining_tasks + dynamic_clients - 1) // dynamic_clients if dynamic_clients > 0 else 0

    client_status = {
        client: {
            "max_conc": max_conc if max_conc is not None else default_one_times_max,
            "active": 0,
            "error_count": 0
        } for client, max_conc in zip(clients, max_batchs)
    }
    status_lock = asyncio.Lock()

    async def process_client(client, model_name, sampling_param):
        """动态任务处理器"""
        while True:
            # 获取当前可用槽位
            # if time.time() - start_time > task_timeout:
            #     return
            async with status_lock:
                status = client_status[client]
                if status["error_count"] >= retry_times:
                    return  # 熔断状态
                
                available = status["max_conc"] - status["active"]
                # if available <= 0 or not task_queue:
                #     return
                if available <= 0:
                    await asyncio.sleep(0.1)  # 新增资源等待
                    continue
                base_batch = max(1, len(task_queue) // max(len(clients), 1))
                # 计算动态批次
                batch_size = min(
                    base_batch,  # 基础分配量
                    available,  # 不超过可用槽位
                    len(task_queue)  # 不超过剩余任务
                )
                status["active"] += batch_size

            # 获取任务批次
            async with queue_lock:
                if not task_queue:
                    # 新增全局完成检查 ✅
                    if all(r is not None for r in results):
                        return

                    await asyncio.sleep(0.2)
                    continue
                batch = [task_queue.popleft() for _ in range(batch_size)] if task_queue else []

            try:
                indices, data = zip(*batch) if batch else ([], [])
                
                # 调用批处理
                batch_results = await batch_generate_async(
                    client=client,
                    served_model_name=model_name,
                    messages=list(data) if is_chat_message else None,
                    prompts=list(data) if not is_chat_message else None,
                    is_chat_message=is_chat_message,
                    sampling_params=sampling_param,
                    task_timeout=task_timeout,
                    retry_times=retry_times*len(data),
                )

                # 成功则重置错误计数
                async with status_lock:
                    client_status[client]["error_count"] = 0
            except Exception as e:
                # 记录错误
                
                async with status_lock:
                    client_status[client]["error_count"] += 1
                    if client_status[client]["error_count"] >= retry_times:
                        print(f"客户端 {model_name} 触发熔断!等待添加重启或修复等功能,以及轮次,减去当前轮次已分配的任务数量进行计算")
                
                # async with queue_lock:
                #     for task in batch:
                #         task_queue.appendleft(task)
                batch_results = [GenerationResult("", "error", str(e))] * len(batch)
            finally:
                async with status_lock:
                    client_status[client]["active"] -= batch_size
            async with queue_lock:
                remaining = sum(1 for r in results if r is None)
                if remaining == 0:
                    return
            # 写入结果
            async with queue_lock:
                for idx, result in zip(indices, batch_results):
                    results[idx] = result
    use_sampling_params = None
    if sampling_params is None:
        use_sampling_params = [GenerationArgs()] * len(clients)  # 创建默认参数对象
        # print("不对劲1")
    elif not isinstance(sampling_params, list):
        use_sampling_params = [sampling_params] * len(clients)
        # print("不对劲2")
    else:
        use_sampling_params = copy.deepcopy(sampling_params)  # 改为深拷贝 ✅
        # print("不对劲3")
    # print("len(clients)",len(clients))
    # print("sampling_params",sampling_params)
    # print("use_sampling_params",use_sampling_params)
    # 启动所有客户端处理器
    tasks = []
    for client, model_name, sampling_param in zip(clients, served_model_names,use_sampling_params):
        # # 为每个客户端创建max_concurrent个worker
        # max_workers = client_status[client]["max_conc"]
        # # 为每个客户端创建对应数量的 worker
        # for _ in range(max_workers):
        #     tasks.append(asyncio.create_task(process_client(client, model_name)))
        # 每个客户端只需要创建1个processor,其内部循环会自动处理多个批次
        tasks.append(asyncio.create_task(process_client(client, model_name, sampling_param)))

    await asyncio.gather(*tasks)
    for i in range(len(results)):
        if results[i] is None:
            results[i] = GenerationResult(
            text="", 
            finish_reason="timeout not begin",
            stop_reason="timeout not begin"
        )
    return results

def parallel_generate(
    clients: List[Union[OpenAI, AsyncOpenAI]],
    served_model_names: List[str],
    messages_list: Optional[List[List[dict]]] = None,
    prompts_list: Optional[List[str]] = None,
    is_chat_message: bool = True,
    sampling_params: Union[GenerationArgs, List[GenerationArgs]] = None,
    task_timeout: int = 1800,
    retry_times: int = 1,
    start_time: Optional[float] = None,
    max_batchs: Optional[List[int]] = None,

) -> List[GenerationResult]:
    async def _async_wrapper():
        # 转换同步客户端为异步客户端
        # async_clients = []
        # for client in clients:
        #     if isinstance(client, AsyncOpenAI):
        #         async_clients.append(client)
        #     else:
        #         async_clients.append(AsyncOpenAI(
        #             api_key=client.api_key,
        #             base_url=client.base_url
        #         ))
        
        return await parallel_generate_async(
            clients=clients,
            served_model_names=served_model_names,
            messages_list=messages_list,
            prompts_list=prompts_list,
            is_chat_message=is_chat_message,
            sampling_params=sampling_params,
            task_timeout=task_timeout,
            retry_times=retry_times,
            start_time=start_time,
            max_batchs=max_batchs,

        )

    try:
        # 尝试获取运行中的事件循环
        loop = asyncio.get_running_loop()
        # 如果已有运行中的事件循环,使用嵌套执行
        return loop.run_until_complete(_async_wrapper())
    except RuntimeError:
        # 没有运行中的事件循环,新建事件循环
        return asyncio.run(_async_wrapper())
    
if __name__ == "__main__":

    # 初始化客户端
    # client = openai.OpenAI(
    #     api_key="sk-xxx",
    #     base_url="http://localhost:45001/v1"
    # )
    client = AsyncOpenAI(
        api_key="sk-xxx",
        base_url="http://localhost:45001/v1"
    )
    # 第一次调用:使用消息列表模式
    print("测试对话模式".center(50, "="))
    test_messages = [
        [
            {"role": "user", "content": "你是一个有帮助的智能体。"},
            {"role": "assistant", "content": "您想去哪个城市?"},
            {"role": "user", "content": "法国的首都是哪里?"}
        ],
        [{"role": "user", "content": "1+1="}],

    ]
    chat_results = batch_generate(
        client=client,
        served_model_name="1",
        messages=test_messages  # 自动识别为对话模式
    )
    for i, result in enumerate(chat_results):
        print(f"对话结果 {i+1}:",result.finish_reason,result.stop_reason)
        print(result.text)
        print("-" * 50)
    
    # 第二次调用:使用提示词模式
    print("\n测试提示词模式".center(50, "="))
    test_prompts = [
        "法国的首都是哪里?",
        "1+1=",

    ]
    completion_results = batch_generate(
        client=client,
        served_model_name="1",
        prompts=test_prompts,  # 显式指定模式
        is_chat_message=False  # 重要:明确关闭对话模式
    )
    for i, result in enumerate(completion_results):
        print(f"提示结果 {i+1}:",result.finish_reason,result.stop_reason)
        print(result.text)
        print("-" * 50)
        

只有把batch_generate 里的代码改成每次都新建。然后每次都关闭才能基本不报错

def batch_generate(
    client: Union[OpenAI, AsyncOpenAI],
    served_model_name: str,
    messages: Optional[List[List[dict]]] = None,
    prompts: Optional[List[str]] = None,
    is_chat_message: bool = True,
    sampling_params: Optional[GenerationArgs] = None,
    task_timeout: int = 1800,
    retry_times: int = 3,
    start_time = None,
) -> List[GenerationResult]:
    async def _async_wrapper(is_async,api_key,base_url):
        # 暂时需要都新建因为lmdeploy api 有bug TODO:待lmdeploy  正常去掉新建和关闭的处理
        if is_async:
            # 如果是同步客户端,创建新的异步客户端(复用 API 配置)
            client = AsyncOpenAI(
                api_key=api_key,
                base_url=base_url,
                # 可继续传递其他参数(如 timeout 等)
            )
        else:
             client = OpenAI(
                api_key=api_key,
                base_url=base_url,
                # 可继续传递其他参数(如 timeout 等)
            )
        return await batch_generate_async(
            client=client,
            served_model_name=served_model_name,
            messages=messages,
            prompts=prompts,
            is_chat_message=is_chat_message,
            sampling_params=sampling_params,
            task_timeout=task_timeout,
            retry_times=retry_times,
            start_time=start_time,
        )
    is_async = isinstance(client, AsyncOpenAI)
    return asyncio.run(_async_wrapper(is_async,client.api_key,client.base_url))

但更好的做法应该是即便去掉response.close() 相关的代码。程序也能正常运行才合适。最开始ai给的代码之所以会卡住就是因为没有关闭。但现在这些代码无论怎样处理,或多或少都会报异常。虽然能正常得到结果(现在的代码由于不是每次请求都新建opneai 客户端,所以不防止还会有使用已关闭的客户端的问题。我也是故意这样处理的。待后续去掉response.close() 以及每次都新建客户端之后还能正常处理)

@AllentDan
Copy link
Collaborator

从日志看CancelledError,应该是客户端在 stream 生成的过程中取消 (break)了。

@xiezhipeng-git
Copy link
Author

xiezhipeng-git commented Feb 26, 2025

从日志看CancelledError,应该是客户端在 stream 生成的过程中取消 (break)了。

好像不止这个原因。当时有在

async for chunk in response:
    if time.time() - start_time > task_timeout:
        print("超时!返回当前结果。")
        break

里增加超时的处理
但是添加超时后break 的代码 属于正常需求啊。即,超时后,后面的内容请求直接break掉。不再请求了,属于常规需求。而且我给的代码后来去掉了break相关的代码, 但是仍然会有其他错误。可以试着运行下我给的代码。如果不采用每次都新建openai 客户端然后关闭openai客户端,就会有错误。
而且。如果在一个设备里开启多个小模型的api比如 开启多个0.5B 模型。或者是 kaggle上4个设备开启4个api 服务。即便每次都删除也会有错误

lmdeploy - ERROR - async_engine.py:592 - [safe_run] exception caught: CancelledError Cancelled by cancel scope 7f72abd68890
Exception in callback <bound method StreamingSemaphore.release of <lmdeploy.turbomind.turbomind.StreamingSemaphore object at 0x7f72abd6a990>>
handle: <Handle StreamingSemaphore.release>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/turbomind/turbomind.py", line 388, in release
    self.fut.set_result(None)
asyncio.exceptions.InvalidStateError: invalid state

并且这个错误在kaggleL4*4 上有高达40%的概率错误。导致完全无法使用多个api 接口
@AllentDan @grimoire

@xiezhipeng-git
Copy link
Author

还可能遇到下列错误 @AllentDan

2025-03-07 02:08:21,859 - lmdeploy - ERROR - async_engine.py:592 - [safe_run] exception caught: GeneratorExit
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/root/anaconda3/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py", line 401, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/applications.py", line 113, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/middleware/cors.py", line 85, in __call__
    await self.app(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/routing.py", line 715, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/routing.py", line 735, in app
    await route.handle(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/routing.py", line 288, in handle
    await self.app(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/routing.py", line 76, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/root/anaconda3/lib/python3.12/site-packages/starlette/routing.py", line 73, in app
    response = await f(request)
               ^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/fastapi/routing.py", line 301, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/lmdeploy/serve/openai/api_server.py", line 699, in completions_v1
    response = CompletionResponse(
               ^^^^^^^^^^^^^^^^^^^
  File "/root/anaconda3/lib/python3.12/site-packages/pydantic/main.py", line 193, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for CompletionResponse
choices.0
  Input should be a valid dictionary or instance of CompletionResponseChoice [type=model_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.8/v/model_type
2025-03-07 02:09:32,328 - lmdeploy - ERROR - async_engine.py:592 - [safe_run] exception caught: GeneratorExit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants