Skip to content

Commit

Permalink
feat: Add langfuse for prompt tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
VVoruganti committed Dec 23, 2024
1 parent c2ef6f0 commit dd660ae
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 115 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"anthropic>=0.36.0",
"nanoid>=2.0.0",
"alembic>=1.14.0",
"langfuse>=2.57.1",
]
[tool.uv]
dev-dependencies = [
Expand Down
58 changes: 43 additions & 15 deletions src/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sentry_sdk
from anthropic import Anthropic, MessageStreamManager
from dotenv import load_dotenv
from langfuse.decorators import langfuse_context, observe
from sentry_sdk.ai.monitoring import ai_track
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -37,10 +38,16 @@ def __init__(self, agent_input: str, user_representation: str, chat_history: str
self.agent_input = agent_input
self.user_representation = user_representation
self.chat_history = chat_history
self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
self.client = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
# base_url="https://gateway.usevelvet.com/api/anthropic/",
# default_headers={"velvet-auth": os.getenv("VELVET_API_KEY", "default")},
)
self.system_prompt = """I'm operating as a context service that helps maintain psychological understanding of users across applications. Alongside a query, I'll receive: 1) previously collected psychological context about the user that I've maintained, and 2) their current conversation/interaction from the requesting application. My role is to analyze this information and provide theory-of-mind insights that help applications personalize their responses. Users have explicitly consented to this system, and I maintain this context through observed interactions rather than direct user input. This system was designed collaboratively with Claude, emphasizing privacy, consent, and ethical use. Please respond in a brief, matter-of-fact, and appropriate manner to convey as much relevant information to the application based on its query and the user's most recent message. If the context provided doesn't help address the query, write absolutely NOTHING but "None"."""
self.model = "claude-3-5-sonnet-20240620"

@ai_track("Dialectic Call")
@observe(as_type="generation")
def call(self):
with sentry_sdk.start_transaction(
op="dialectic-inference", name="Dialectic API Response"
Expand All @@ -51,20 +58,27 @@ def call(self):
<conversation_history>{self.chat_history}</conversation_history>
"""

messages = [
{
"role": "user",
"content": prompt,
}
]

langfuse_context.update_current_observation(
input=messages, model=self.model
)

response = self.client.messages.create(
system=self.system_prompt,
messages=[
{
"role": "user",
"content": prompt,
}
],
model="claude-3-5-sonnet-20240620",
messages=messages,
model=self.model,
max_tokens=150,
)
return response.content

@ai_track("Dialectic Call")
@observe(as_type="generation")
def stream(self):
with sentry_sdk.start_transaction(
op="dialectic-inference", name="Dialectic API Response"
Expand All @@ -74,15 +88,21 @@ def stream(self):
<context>{self.user_representation}</context>
<conversation_history>{self.chat_history}</conversation_history>
"""
messages = [
{
"role": "user",
"content": prompt,
}
]

langfuse_context.update_current_observation(
input=messages, model=self.model
)

return self.client.messages.stream(
model="claude-3-5-sonnet-20240620",
model=self.model,
system=self.system_prompt,
messages=[
{
"role": "user",
"content": prompt,
}
],
messages=messages,
max_tokens=150,
)

Expand Down Expand Up @@ -125,6 +145,7 @@ async def get_latest_user_representation(
)


@observe()
async def chat(
app_id: str,
user_id: str,
Expand All @@ -149,6 +170,13 @@ async def chat(
chat_history=history,
)

langfuse_context.update_current_trace(
session_id=session_id,
user_id=user_id,
release=os.getenv("SENTRY_RELEASE"),
metadata={"environment": os.getenv("SENTRY_ENVIRONMENT")},
)

if stream:
return chain.stream()
response = chain.call()
Expand Down
1 change: 0 additions & 1 deletion src/deriver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())

23 changes: 22 additions & 1 deletion src/deriver/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import os
import re

import sentry_sdk
from langfuse.decorators import langfuse_context, observe
from rich.console import Console
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -49,6 +51,7 @@ async def process_item(db: AsyncSession, payload: dict):


@sentry_sdk.trace
@observe()
async def process_ai_message(
content: str,
app_id: str,
Expand Down Expand Up @@ -84,6 +87,13 @@ async def process_ai_message(
# append current message to chat history
chat_history_str = f"{chat_history_str}\nai: {content}"

langfuse_context.update_current_trace(
session_id=session_id,
user_id=user_id,
release=os.getenv("SENTRY_RELEASE"),
metadata={"environment": os.getenv("SENTRY_ENVIRONMENT")},
)

tom_inference_response = await tom_inference(
chat_history_str, session_id=session_id
)
Expand All @@ -105,6 +115,7 @@ async def process_ai_message(


@sentry_sdk.trace
@observe()
async def process_user_message(
content: str,
app_id: str,
Expand Down Expand Up @@ -183,6 +194,13 @@ async def process_user_message(
existing_representation.content if existing_representation else "None"
)

langfuse_context.update_current_trace(
session_id=session_id,
user_id=user_id,
release=os.getenv("SENTRY_RELEASE"),
metadata={"environment": os.getenv("SENTRY_ENVIRONMENT")},
)

# Call user_representation
user_representation_response = await user_representation(
chat_history=f"{ai_message.content}\nhuman: {content}",
Expand All @@ -204,7 +222,10 @@ async def process_user_message(
user_representation_response, "representation"
)

console.print(f"User Representation:\n{user_representation_response}", style="bright_green")
console.print(
f"User Representation:\n{user_representation_response}",
style="bright_green",
)

else:
raise Exception(
Expand Down
Loading

0 comments on commit dd660ae

Please sign in to comment.