Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Create openai_api.py #35

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cmake_minimum_required(VERSION 3.12)
project(qwen.cpp VERSION 0.0.1 LANGUAGES CXX)

set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib CACHE STRING "")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib CACHE STRING "")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin CACHE STRING "")
Expand Down
185 changes: 185 additions & 0 deletions qwen_cpp/openai_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import asyncio
import logging
import time
from typing import List, Literal, Optional, Union

import qwen_cpp
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
from sse_starlette.sse import EventSourceResponse

logging.basicConfig(level=logging.INFO, format=r"%(asctime)s - %(module)s - %(levelname)s - %(message)s")


class Settings(BaseSettings):
model: str = "qwen14b-ggml.bin"
tiktoken: str = "Qwen-14B-Chat/qwen.tiktoken"
num_threads: int = 0


class ChatMessage(BaseModel):
role: Literal["system", "user", "assistant"]
content: str


class DeltaMessage(BaseModel):
role: Optional[Literal["system", "user", "assistant"]] = None
content: Optional[str] = None


class ChatCompletionRequest(BaseModel):
model: str = "default-model"
messages: List[ChatMessage]
temperature: float = Field(default=0.95, ge=0.0, le=2.0)
top_p: float = Field(default=0.7, ge=0.0, le=1.0)
stream: bool = False
max_tokens: int = Field(default=2048, ge=0)

model_config = {
"json_schema_extra": {"examples": [{"model": "default-model", "messages": [{"role": "user", "content": "你好"}]}]}
}


class ChatCompletionResponseChoice(BaseModel):
index: int = 0
message: ChatMessage
finish_reason: Literal["stop", "length"] = "stop"


class ChatCompletionResponseStreamChoice(BaseModel):
index: int = 0
delta: DeltaMessage
finish_reason: Optional[Literal["stop", "length"]] = None


class ChatCompletionResponse(BaseModel):
id: str = "chatcmpl"
model: str = "default-model"
object: Literal["chat.completion", "chat.completion.chunk"]
created: int = Field(default_factory=lambda: int(time.time()))
choices: Union[List[ChatCompletionResponseChoice], List[ChatCompletionResponseStreamChoice]]

model_config = {
"json_schema_extra": {
"examples": [
{
"id": "chatcmpl",
"model": "default-model",
"object": "chat.completion",
"created": 1691166146,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": "你好!"},
"finish_reason": "stop",
}
],
}
]
}
}


settings = Settings()
app = FastAPI()
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]
)
pipeline = qwen_cpp.Pipeline(settings.model,settings.tiktoken)
lock = asyncio.Lock()


def stream_chat(history, body):
yield ChatCompletionResponse(
object="chat.completion.chunk",
choices=[ChatCompletionResponseStreamChoice(delta=DeltaMessage(role="assistant"))],
)

for piece in pipeline.chat(
history,
max_length=body.max_tokens,
do_sample=body.temperature > 0,
top_p=body.top_p,
temperature=body.temperature,
num_threads=settings.num_threads,
stream=True,
):
yield ChatCompletionResponse(
object="chat.completion.chunk",
choices=[ChatCompletionResponseStreamChoice(delta=DeltaMessage(content=piece))],
)

yield ChatCompletionResponse(
object="chat.completion.chunk",
choices=[ChatCompletionResponseStreamChoice(delta=DeltaMessage(), finish_reason="stop")],
)


async def stream_chat_event_publisher(history, body):
output = ""
try:
async with lock:
for chunk in stream_chat(history, body):
await asyncio.sleep(0) # yield control back to event loop for cancellation check
output += chunk.choices[0].delta.content or ""
yield chunk.model_dump_json(exclude_unset=True)
logging.info(f'prompt: "{history[-1]}", stream response: "{output}"')
except asyncio.CancelledError as e:
logging.info(f'prompt: "{history[-1]}", stream response (partial): "{output}"')
raise e


@app.post("/v1/chat/completions")
async def create_chat_completion(body: ChatCompletionRequest) -> ChatCompletionResponse:
# ignore system messages
history = [msg.content for msg in body.messages if msg.role != "system"]
if len(history) % 2 != 1:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "invalid history size")

if body.stream:
generator = stream_chat_event_publisher(history, body)
return EventSourceResponse(generator)

output = pipeline.chat(
history=history,
max_length=body.max_tokens,
do_sample=body.temperature > 0,
top_p=body.top_p,
temperature=body.temperature,
)
logging.info(f'prompt: "{history[-1]}", sync response: "{output}"')

return ChatCompletionResponse(
object="chat.completion",
choices=[ChatCompletionResponseChoice(message=ChatMessage(role="assistant", content=output))],
)


class ModelCard(BaseModel):
id: str
object: Literal["model"] = "model"
owned_by: str = "owner"
permission: List = []


class ModelList(BaseModel):
object: Literal["list"] = "list"
data: List[ModelCard] = []

model_config = {
"json_schema_extra": {
"examples": [
{
"object": "list",
"data": [{"id": "gpt-3.5-turbo", "object": "model", "owned_by": "owner", "permission": []}],
}
]
}
}


@app.get("/v1/models")
async def list_models() -> ModelList:
return ModelList(data=[ModelCard(id="gpt-3.5-turbo")])