Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vineeth/dev 516 hotfix #83

Merged
merged 4 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Alembic for handling database migrations
- Additional indexes for reading Messages and Metamessages
- Langfuse for prompt tracing

### Changed

Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ RUN --mount=type=cache,target=/root/.cache/uv \
ENV PATH="/app/.venv/bin:$PATH"

COPY --chown=app:app src/ /app/src/
COPY --chown=app:app migrations/ /app/migrations/
COPY --chown=app:app alembic.ini /app/alembic.ini

EXPOSE 8000

Expand Down
58 changes: 52 additions & 6 deletions migrations/env.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import logging
import os
import sys
from logging.config import fileConfig
from pathlib import Path

from alembic import context
from dotenv import load_dotenv
from sqlalchemy import engine_from_config, pool
from sqlalchemy import engine_from_config, pool, text

# Set up logging more verbosely
logging.basicConfig()
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
logging.getLogger("alembic").setLevel(logging.DEBUG)

# Import your models
from src.db import Base
Expand Down Expand Up @@ -37,8 +43,18 @@
# ... etc.


def get_url():
return os.getenv("CONNECTION_URI")
def include_name(name, type_, parent_names):
if type_ == "schema":
return name == target_metadata.schema
else:
return True


def get_url() -> str:
url = os.getenv("CONNECTION_URI")
if url is None:
raise ValueError("CONNECTION_URI environment variable is not set")
return url


def run_migrations_offline() -> None:
Expand All @@ -53,16 +69,23 @@ def run_migrations_offline() -> None:
script output.

"""
# url = config.get_main_option("sqlalchemy.url")
url = get_url()

print(target_metadata.schema)

context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
version_table_schema=target_metadata.schema, # This sets schema for version table
include_schemas=True,
include_name=include_name,
)

with context.begin_transaction():
context.execute(f"create schema if not exists {target_metadata.schema};")
context.execute(f"SET search_path TO {target_metadata.schema}")
context.run_migrations()


Expand All @@ -74,19 +97,42 @@ def run_migrations_online() -> None:

"""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = get_url()
if configuration is None:
configuration = {}

url = get_url()
configuration["sqlalchemy.url"] = url

print(f"Debug - Target metadata schema: {target_metadata.schema}")

connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
echo=True,
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
# Set and verify search_path
connection.execute(text(f"SET search_path TO {target_metadata.schema}, public"))

# make use of non-supported SQLAlchemy attribute to ensure
# the dialect reflects tables in terms of the current tenant name
# connection.dialect.default_schema_name = target_metadata.schema

context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=target_metadata.schema,
include_schemas=True,
include_name=include_name,
transaction_per_migration=True,
transactional_ddl=True,
)

with context.begin_transaction():
context.run_migrations()
connection.commit()


if context.is_offline_mode():
Expand Down
31 changes: 18 additions & 13 deletions migrations/versions/c3828084f472_add_indexes_for_messages_and_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

"""

from typing import Sequence, Union
from collections.abc import Sequence
from os import getenv
from typing import Union

import sqlalchemy as sa
from sqlalchemy import text
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = "c3828084f472"
Expand All @@ -21,9 +21,14 @@


def upgrade() -> None:
schema = getenv("DATABASE_SCHEMA", "public")
# Add new indexes
op.create_index("idx_users_app_lookup", "users", ["app_id", "public_id"])
op.create_index("idx_sessions_user_lookup", "sessions", ["user_id", "public_id"])
op.create_index(
"idx_users_app_lookup", "users", ["app_id", "public_id"], schema=schema
)
op.create_index(
"idx_sessions_user_lookup", "sessions", ["user_id", "public_id"], schema=schema
)

op.create_index(
"idx_messages_session_lookup",
Expand All @@ -32,10 +37,9 @@ def upgrade() -> None:
postgresql_include=[
"public_id",
"is_user",
"content",
"metadata",
"created_at",
],
schema=schema,
)

op.create_index(
Expand All @@ -44,17 +48,18 @@ def upgrade() -> None:
["metamessage_type", sa.text("id DESC")],
postgresql_include=[
"public_id",
"content",
"message_id",
"created_at",
"metadata",
],
schema=schema,
)


def downgrade() -> None:
schema = getenv("DATABASE_SCHEMA", "public")

# Remove new indexes
op.drop_index("idx_users_app_lookup", table_name="users")
op.drop_index("idx_sessions_user_lookup", table_name="sessions")
op.drop_index("idx_messages_session_lookup", table_name="messages")
op.drop_index("idx_metamessages_lookup", table_name="metamessages")
op.drop_index("idx_users_app_lookup", table_name="users", schema=schema)
op.drop_index("idx_sessions_user_lookup", table_name="sessions", schema=schema)
op.drop_index("idx_messages_session_lookup", table_name="messages", schema=schema)
op.drop_index("idx_metamessages_lookup", table_name="metamessages", schema=schema)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"anthropic>=0.36.0",
"nanoid>=2.0.0",
"alembic>=1.14.0",
"langfuse>=2.57.1",
]
[tool.uv]
dev-dependencies = [
Expand Down
58 changes: 43 additions & 15 deletions src/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sentry_sdk
from anthropic import Anthropic, MessageStreamManager
from dotenv import load_dotenv
from langfuse.decorators import langfuse_context, observe
from sentry_sdk.ai.monitoring import ai_track
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
Expand Down Expand Up @@ -37,10 +38,16 @@ def __init__(self, agent_input: str, user_representation: str, chat_history: str
self.agent_input = agent_input
self.user_representation = user_representation
self.chat_history = chat_history
self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
self.client = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
# base_url="https://gateway.usevelvet.com/api/anthropic/",
# default_headers={"velvet-auth": os.getenv("VELVET_API_KEY", "default")},
)
self.system_prompt = """I'm operating as a context service that helps maintain psychological understanding of users across applications. Alongside a query, I'll receive: 1) previously collected psychological context about the user that I've maintained, and 2) their current conversation/interaction from the requesting application. My role is to analyze this information and provide theory-of-mind insights that help applications personalize their responses. Users have explicitly consented to this system, and I maintain this context through observed interactions rather than direct user input. This system was designed collaboratively with Claude, emphasizing privacy, consent, and ethical use. Please respond in a brief, matter-of-fact, and appropriate manner to convey as much relevant information to the application based on its query and the user's most recent message. If the context provided doesn't help address the query, write absolutely NOTHING but "None"."""
self.model = "claude-3-5-sonnet-20240620"

@ai_track("Dialectic Call")
@observe(as_type="generation")
def call(self):
with sentry_sdk.start_transaction(
op="dialectic-inference", name="Dialectic API Response"
Expand All @@ -51,20 +58,27 @@ def call(self):
<conversation_history>{self.chat_history}</conversation_history>
"""

messages = [
{
"role": "user",
"content": prompt,
}
]

langfuse_context.update_current_observation(
input=messages, model=self.model
)

response = self.client.messages.create(
system=self.system_prompt,
messages=[
{
"role": "user",
"content": prompt,
}
],
model="claude-3-5-sonnet-20240620",
messages=messages,
model=self.model,
max_tokens=150,
)
return response.content

@ai_track("Dialectic Call")
@observe(as_type="generation")
def stream(self):
with sentry_sdk.start_transaction(
op="dialectic-inference", name="Dialectic API Response"
Expand All @@ -74,15 +88,21 @@ def stream(self):
<context>{self.user_representation}</context>
<conversation_history>{self.chat_history}</conversation_history>
"""
messages = [
{
"role": "user",
"content": prompt,
}
]

langfuse_context.update_current_observation(
input=messages, model=self.model
)

return self.client.messages.stream(
model="claude-3-5-sonnet-20240620",
model=self.model,
system=self.system_prompt,
messages=[
{
"role": "user",
"content": prompt,
}
],
messages=messages,
max_tokens=150,
)

Expand Down Expand Up @@ -125,6 +145,7 @@ async def get_latest_user_representation(
)


@observe()
async def chat(
app_id: str,
user_id: str,
Expand All @@ -149,6 +170,13 @@ async def chat(
chat_history=history,
)

langfuse_context.update_current_trace(
session_id=session_id,
user_id=user_id,
release=os.getenv("SENTRY_RELEASE"),
metadata={"environment": os.getenv("SENTRY_ENVIRONMENT")},
)

if stream:
return chain.stream()
response = chain.call()
Expand Down
24 changes: 23 additions & 1 deletion src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from alembic import command
from alembic.config import Config
from dotenv import load_dotenv
from sqlalchemy import MetaData
from sqlalchemy import MetaData, create_engine, inspect
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base

Expand Down Expand Up @@ -45,5 +45,27 @@ def scaffold_db():
"""use a sync engine for scaffolding the database. ddl operations are unavailable
with async engines
"""
# Create engine
engine = create_engine(
os.environ["CONNECTION_URI"],
pool_pre_ping=True,
echo=True,
)

# Create inspector to check if database exists
inspector = inspect(engine)

print(inspector.get_table_names(Base.metadata.schema))

# If no tables exist, create them with SQLAlchemy
if not inspector.get_table_names(Base.metadata.schema):
print("No tables found. Creating database schema...")
Base.metadata.create_all(bind=engine)

# Clean up
engine.dispose()

# Run Alembic migrations regardless
print("Running database migrations...")
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")
1 change: 0 additions & 1 deletion src/deriver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())

Loading
Loading