Skip to content

Commit

Permalink
feat: open telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
so2liu committed Jan 17, 2025
1 parent aae1787 commit 6b4d5b0
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 68 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.typeCheckingMode": "basic"
}
8 changes: 5 additions & 3 deletions app/env_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

@dataclass
class EnvConfig:
OPENAI_BASE_URL: str = os.getenv("OPENAI_BASE_URL")
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY")
VERBOSE: bool = os.getenv("VERBOSE") == "true"
OPENAI_BASE_URL: str = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com")
OPENAI_API_KEY: str = os.environ.get("OPENAI_API_KEY", "")
VERBOSE: bool = os.environ.get("VERBOSE") == "true"
LOG_MESSAGE: bool = os.environ.get("LOG_MESSAGE") == "true"
LOG_REQUEST_BODY: bool = os.environ.get("LOG_REQUEST_BODY") == "true"


env_config = EnvConfig()
Expand Down
145 changes: 89 additions & 56 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,95 +8,128 @@
from .utils import get_openai_client, get_request_hash, stream_response
from fastapi.middleware.cors import CORSMiddleware
from .env_config import env_config
from .telemetry import init_telemetry, tracer, Timer
from typing import Annotated
from fastapi import Header

# Initialize the database when the application starts
init_db()

app = FastAPI()
init_telemetry(app)

# Add CORS middleware configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"*"
], # Allow all sources, you can set specific domains based on your needs
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"], # Allow all methods
allow_headers=["*"], # Allow all request headers
allow_methods=["*"],
allow_headers=["*"],
)


async def process_chat_request(request: Request, use_cache: bool):
body = await request.json()

print(json.dumps(body, indent=2, ensure_ascii=False))

if env_config.VERBOSE:
print("Verbose: Message contents")
for message in body.get("messages", []):
print(f"Role: {message.get('role')}")
print(f"Content: {message.get('content')}")
print("---")

print("\nVerbose: Request body (excluding message contents)")
body_without_content = copy.deepcopy(body)
if "messages" in body_without_content:
for message in body_without_content["messages"]:
if "content" in message:
message["content"] = "[CONTENT REMOVED]"
print(json.dumps(body_without_content, indent=2, ensure_ascii=False))

chat_request = ChatCompletionRequest(**body)
client = get_openai_client(request.headers.get("Authorization"))

if use_cache:
request_hash = get_request_hash(body)
cached_response = check_cache(request_hash)
if cached_response:
print("hit cache")
return cached_response

if chat_request.stream:
return StreamingResponse(
stream_response(
client, chat_request, use_cache, request_hash if use_cache else ""
),
media_type="text/event-stream",
)
else:
response = client.chat.completions.create(**chat_request.model_dump())
async def process_chat_request(
request: Request, use_cache: bool, authorization: Annotated[str, Header()]
):
with tracer.start_as_current_span("process_chat_request") as span:
body = await request.json()
span.set_attribute("request.use_cache", use_cache)

if env_config.LOG_MESSAGE:
print(json.dumps(body, indent=2, ensure_ascii=False))
print("Verbose: Message contents")
for message in body.get("messages", []):
print(f"Role: {message.get('role')}")
print(f"Content: {message.get('content')}")
print("---")

print("\nVerbose: Request body (excluding message contents)")
body_without_content = copy.deepcopy(body)
if "messages" in body_without_content:
for message in body_without_content["messages"]:
if "content" in message:
message["content"] = "[CONTENT REMOVED]"
print(json.dumps(body_without_content, indent=2, ensure_ascii=False))

chat_request = ChatCompletionRequest(**body)
client = get_openai_client(authorization)

if use_cache:
print("add to cache")
cache_response(request_hash, json.dumps(body), response.to_json(), False)
try:
return ChatCompletionResponse(**response.to_dict())
except Exception as e:
return {"error": response.to_dict()}
request_hash = get_request_hash(body)
with tracer.start_span("cache_lookup") as cache_span:
with Timer() as timer:
cached_response = check_cache(request_hash)
cache_span.set_attribute("cache_lookup.duration_ms", timer.duration)
cache_span.set_attribute("cache.hit", cached_response is not None)

if cached_response:
print("hit cache")
return cached_response

first_token_timer = Timer()
if chat_request.stream:
with first_token_timer:
stream_gen = stream_response(
client,
chat_request,
use_cache,
request_hash if use_cache else "",
first_token_timer,
)
return StreamingResponse(
stream_gen,
media_type="text/event-stream",
)
else:
with first_token_timer:
response = client.chat.completions.create(**chat_request.model_dump())

span.set_attribute("first_token_latency_ms", first_token_timer.duration)

if use_cache:
print("add to cache")
with tracer.start_span("cache_write") as cache_span:
with Timer() as timer:
cache_response(
request_hash, json.dumps(body), response.to_json(), False
)
cache_span.set_attribute("cache_write.duration_ms", timer.duration)

try:
return ChatCompletionResponse(**response.to_dict())
except Exception as e:
return {"error": response.to_dict()}


@app.post("/cache/chat/completions")
@app.post("/cache/v1/chat/completions")
async def cache_chat_completion(request: Request):
async def cache_chat_completion(
request: Request, authorization: Annotated[str, Header()]
):
try:
return await process_chat_request(request, use_cache=True)
return await process_chat_request(
request, use_cache=True, authorization=authorization
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))


@app.post("/chat/completions")
@app.post("/v1/chat/completions")
async def chat_completion(request: Request):
async def chat_completion(request: Request, authorization: Annotated[str, Header()]):
try:
return await process_chat_request(request, use_cache=False)
return await process_chat_request(
request, use_cache=False, authorization=authorization
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))


@app.get("/cache/models")
@app.get("/models")
async def get_models(request: Request):
async def get_models(request: Request, authorization: Annotated[str, Header()]):
try:
client = get_openai_client(request.headers.get("Authorization"))
client = get_openai_client(authorization)
return client.models.list()
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
Expand Down
45 changes: 45 additions & 0 deletions app/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from fastapi import FastAPI
import time

# 初始化 tracer provider
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "llm-cache-server"})

tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)

# 获取 tracer
tracer = trace.get_tracer(__name__)


def init_telemetry(app: FastAPI):
"""初始化 FastAPI 的 OpenTelemetry instrumentation"""
FastAPIInstrumentor.instrument_app(app)


class Timer:
"""用于测量时间的上下文管理器"""

def __init__(self):
self.start_time = None
self.end_time = None

def __enter__(self):
self.start_time = time.time()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.time()

@property
def duration(self):
"""返回持续时间(毫秒)"""
if self.start_time is None or self.end_time is None:
return 0
return (self.end_time - self.start_time) * 1000 # 转换为毫秒
20 changes: 12 additions & 8 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
from .cache import cache_response
from openai.types.chat import ChatCompletionChunk
from .env_config import env_config
import json


def get_openai_client(auth_header: str):
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split("Bearer ")[1]
api_key = token if len(token) > 5 else env_config.OPENAI_API_KEY
else:
api_key = env_config.OPENAI_API_KEY
def get_openai_client(authorization: str):
api_key = authorization.split(" ")[1] if authorization else None

return openai.OpenAI(
base_url=env_config.OPENAI_BASE_URL,
api_key=api_key,
api_key=api_key or env_config.OPENAI_API_KEY,
)


Expand All @@ -43,15 +40,22 @@ async def stream_response(
client: openai.OpenAI,
chat_request: ChatCompletionRequest,
use_cache: bool,
request_hash: str | None = None,
request_hash: str,
first_token_timer=None,
):
first_chunk = True
response_chunks = []
response = client.chat.completions.create(
**chat_request.model_dump(exclude={"stream"}),
stream=True,
)

for chunk in response:
if first_chunk and first_token_timer:
first_token_timer.__exit__(None, None, None)
first_chunk = False

# use dict to avoid json serialization \n
chunk_dict = chunk.to_dict()
if use_cache:
response_chunks.append(chunk)
Expand Down
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ fastapi==0.111.0
uvicorn==0.29.0
pydantic==2.6.4
openai==1.58.1
python-dotenv==1.0.0
python-dotenv==1.0.0
opentelemetry-api>=1.21.0
opentelemetry-sdk>=1.21.0
opentelemetry-instrumentation-fastapi>=0.42b0
opentelemetry-exporter-otlp>=1.21.0

0 comments on commit 6b4d5b0

Please sign in to comment.