Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LifoQueue for turbomind async_stream_infer #1179

Merged
merged 3 commits into from
Feb 27, 2024

Conversation

AllentDan
Copy link
Collaborator

#1138 Introduced asyncio.Lifoque while it is not thread-safe for multi-thread function like forward_callback or forward_thread.
Use normal LifoQueue to avoid frequently pop-out items that may pop out the final result.

AllentDan and others added 2 commits February 21, 2024 12:21
* put queue as an argument to the function

* use asyncio.Queue

* lifoque for async stream infer

* lifoque for turbomind

* remove que pop

* recover stream_infer

* fix repeated yield
@AllentDan AllentDan changed the title Lifoque Use LifoQueue for turbomind async_stream_infer Feb 22, 2024
@zhyncs
Copy link
Collaborator

zhyncs commented Feb 22, 2024

Hi @AllentDan If we use LifoQueue rather than asyncio.LifoQueue, does the issue of Python's Queue blocking coroutines still exist? #1138 (comment)

@AllentDan
Copy link
Collaborator Author

AllentDan commented Feb 22, 2024

The previous PR is to fix a thread-safety bug. The stream_infer function used two threads to put the item to a queue. The expected behavior is that the last response (finish=True) is pushed to the queue in forward_thread. However, for the following test script.

import os
import sys
from tqdm import tqdm

from concurrent.futures import ThreadPoolExecutor

from lmdeploy.serve.openai.api_client import APIClient

questions = ['你是谁'] * 1000

num_parallel = 512

def process_one(question, url='0.0.0.0', port='23333'):
    client = APIClient('http://{}:{}'.format(url, port))
    model_name = client.available_models[0]

    msg = [dict(role='user', content=question)]

    data = client.chat_completions_v1(model=model_name, messages=msg)

    for item in data:
        response = item
    
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)

That last response is not pushed to the queue as the last item. Somehow, another response (finish=False) in forward_callback is the last. In this case, the expected response (finish=True) is popped out in the following line.


Consequently, the loop will never be ended since there is no expected last response (finish=True) that will be gotten by the consumer thread. This is the reason that for the test script, there is a request that will never be finished. And there is always a thread looping in
while self.que.qsize() == 0:

@AllentDan
Copy link
Collaborator Author

Hi @AllentDan If we use LifoQueue rather than asyncio.LifoQueue, does the issue of Python's Queue blocking coroutines still exist? #1138 (comment)

As for blocking, I tested the performance. It is the same whether or not we use asyncio.Queue to get an item.

@irexyc
Copy link
Collaborator

irexyc commented Feb 22, 2024

https://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-queue-in-multiple-threads

asyncio.LifoQueue 好像加上queue._loop._write_to_self() 就可以了

@AllentDan
Copy link
Collaborator Author

https://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-queue-in-multiple-threads

asyncio.LifoQueue 好像加上queue._loop._write_to_self() 就可以了

Yes, I tested the method and it worked. However, the benchmark result is comparatively worse than without queue._loop._write_to_self(). It seems manually switching threads slows down the server.

@lvhan028 lvhan028 requested review from irexyc and lvhan028 February 23, 2024 07:27
@irexyc
Copy link
Collaborator

irexyc commented Feb 26, 2024

Yes, I tested the method and it worked. However, the benchmark result is comparatively worse than without queue._loop._write_to_self(). It seems manually switching threads slows down the server.

我这边测restful_api,结果都差不多。

@AllentDan
Copy link
Collaborator Author

Yes, I tested the method and it worked. However, the benchmark result is comparatively worse than without queue._loop._write_to_self(). It seems manually switching threads slows down the server.

我这边测restful_api,结果都差不多。

那得再对下,如果能用 asyncio 的队列,优先用。

@AllentDan
Copy link
Collaborator Author

试了下 que._loop.call_soon_threadsafe(que.put_nowait, (True, output)) 可以避免 pipeline 卡住,也保证性能(甚至提高了 8%)。但是没法 handle 住高并发的测试,会crash。

from tqdm import tqdm

from concurrent.futures import ThreadPoolExecutor

from lmdeploy.serve.openai.api_client import APIClient

questions = ['你是谁'] * 1000

num_parallel = 1000

def process_one(question, url='0.0.0.0', port='23333'):
    client = APIClient('http://{}:{}'.format(url, port))
    model_name = client.available_models[0]

    msg = [dict(role='user', content=question)]

    data = client.chat_completions_v1(model=model_name, messages=msg)

    for item in data:
        response = item
    
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)

@lvhan028
Copy link
Collaborator

lvhan028 commented Feb 27, 2024

试了下 que._loop.call_soon_threadsafe(que.put_nowait, (True, output)) 可以避免 pipeline 卡住,也保证性能(甚至提高了 8%)。但是没法 handle 住高并发的测试,会crash。

from tqdm import tqdm

from concurrent.futures import ThreadPoolExecutor

from lmdeploy.serve.openai.api_client import APIClient

questions = ['你是谁'] * 1000

num_parallel = 1000

def process_one(question, url='0.0.0.0', port='23333'):
    client = APIClient('http://{}:{}'.format(url, port))
    model_name = client.available_models[0]

    msg = [dict(role='user', content=question)]

    data = client.chat_completions_v1(model=model_name, messages=msg)

    for item in data:
        response = item
    
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)

@zhulinJulia24 请把高并发测试加入到测试用例中

@lvhan028 lvhan028 merged commit 21244eb into InternLM:main Feb 27, 2024
4 checks passed
@irexyc
Copy link
Collaborator

irexyc commented Feb 27, 2024

docker 镜像里面用root,或者非 root 用户减少 num_parallel 到256,不会出现Max retries exceeded with url的错误,怀疑是非root用户requests有连接数量的限制。

@ExenVitor
Copy link

我很好奇为什么只有在高并发的api server模式下才观察到这个问题?按理说使用pipeline进行大批量的推理也应该有几率受到这个bug的影响。
我之前用pipeline跑了很多次都没有遇到过block的情况,与#1198用的是同样的数据集,总共20万次推理。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants