Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation Monitor #41

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 37 additions & 49 deletions phi/api/conversation.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,82 @@
from os import getenv
from typing import Optional, Union, Dict, List
from typing import Union, Dict, List

from httpx import Response

from phi.api.api import api, invalid_response
from phi.api.routes import ApiRoutes
from phi.api.schemas.conversation import (
ConversationEventCreate,
ConversationWorkspace,
ConversationEventCreateResopnse,
ConversationUpdate,
ConversationSchema,
ConversationEventCreate,
ConversationMonitorCreate,
)
from phi.constants import WORKSPACE_ID_ENV_VAR, WORKSPACE_HASH_ENV_VAR
from phi.constants import WORKSPACE_ID_ENV_VAR, WORKSPACE_HASH_ENV_VAR, WORKSPACE_KEY_ENV_VAR
from phi.cli.settings import phi_cli_settings
from phi.utils.common import str_to_int
from phi.utils.log import logger


def upsert_conversation(conversation: ConversationUpdate) -> Optional[ConversationSchema]:
def create_conversation_monitor(monitor: ConversationMonitorCreate) -> bool:
if not phi_cli_settings.api_enabled:
return None
return True

logger.debug("--o-o-- Conversation upsert")
with api.Client() as api_client:
logger.debug("--o-o-- Creating Conversation Monitor")
with api.AuthenticatedClient() as api_client:
try:
workspace_id = str_to_int(getenv(WORKSPACE_ID_ENV_VAR))
if workspace_id is None:
return None
workspace_hash = getenv(WORKSPACE_HASH_ENV_VAR)
if workspace_hash is None:
return None
workspace = ConversationWorkspace(id_workspace=workspace_id, ws_hash=workspace_hash)

conversation_workspace = ConversationWorkspace(
id_workspace=str_to_int(getenv(WORKSPACE_ID_ENV_VAR)),
ws_hash=getenv(WORKSPACE_HASH_ENV_VAR),
ws_key=getenv(WORKSPACE_KEY_ENV_VAR),
)
r: Response = api_client.post(
ApiRoutes.CONVERSATION_UPSERT,
ApiRoutes.CONVERSATION_MONITOR_CREATE,
json={
"conversation": conversation.model_dump(exclude_none=True),
"workspace": workspace.model_dump(exclude_none=True),
"monitor": monitor.model_dump(exclude_none=True),
"workspace": conversation_workspace.model_dump(exclude_none=True),
},
)
if invalid_response(r):
return None
return False

response_json: Union[Dict, List] = r.json()
if response_json is None:
return None
return False

conversation_schema: Optional[ConversationSchema] = ConversationSchema.model_validate(response_json)
# logger.debug(f"Conversation: {conversation_schema}")
return conversation_schema
logger.debug(f"Response: {response_json}")
return True
except Exception as e:
logger.debug(f"Could not update conversation: {e}")
return None
logger.debug(f"Could not create conversation monitor: {e}")
return False


def log_conversation_event(conversation: ConversationEventCreate) -> Optional[ConversationEventCreateResopnse]:
def create_conversation_event(conversation: ConversationEventCreate) -> bool:
if not phi_cli_settings.api_enabled:
return None
return True

logger.debug("--o-o-- Log conversation event")
with api.Client() as api_client:
logger.debug("--o-o-- Creating Conversation Event")
with api.AuthenticatedClient() as api_client:
try:
workspace_id = str_to_int(getenv(WORKSPACE_ID_ENV_VAR))
if workspace_id is None:
return None
workspace_hash = getenv(WORKSPACE_HASH_ENV_VAR)
if workspace_hash is None:
return None
workspace = ConversationWorkspace(id_workspace=workspace_id, ws_hash=workspace_hash)

conversation_workspace = ConversationWorkspace(
id_workspace=str_to_int(getenv(WORKSPACE_ID_ENV_VAR)),
ws_hash=getenv(WORKSPACE_HASH_ENV_VAR),
ws_key=getenv(WORKSPACE_KEY_ENV_VAR),
)
r: Response = api_client.post(
ApiRoutes.CONVERSATION_EVENT_CREATE,
json={
"event": conversation.model_dump(exclude_none=True),
"workspace": workspace.model_dump(exclude_none=True),
"workspace": conversation_workspace.model_dump(exclude_none=True),
},
)
if invalid_response(r):
return None
return False

response_json: Union[Dict, List] = r.json()
if response_json is None:
return None
return False

conversation_event_response: ConversationEventCreateResopnse = (
ConversationEventCreateResopnse.model_validate(response_json)
)
# logger.debug(f"Conversation response: {conversation_event_response}")
return conversation_event_response
logger.debug(f"Response: {response_json}")
return True
except Exception as e:
logger.debug(f"Could not log conversation event: {e}")
return None
return False
2 changes: 1 addition & 1 deletion phi/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ApiRoutes:
MONITOR_EVENT_CREATE: str = "/v1/monitor/event/create"

# conversation paths
CONVERSATION_UPSERT: str = "/v1/conversation/upsert"
CONVERSATION_MONITOR_CREATE: str = "/v1/conversation/monitor/create"
CONVERSATION_EVENT_CREATE: str = "/v1/conversation/event/create"

# ai paths
Expand Down
37 changes: 10 additions & 27 deletions phi/api/schemas/conversation.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,25 @@
from datetime import datetime
from typing import Optional, Dict, Any

from pydantic import BaseModel


class ConversationWorkspace(BaseModel):
id_workspace: int
id_workspace: Optional[int] = None
ws_hash: Optional[str] = None
ws_key: Optional[str] = None


class ConversationEventCreate(BaseModel):
"""Data sent to API to create a new conversation event"""

id_user: Optional[int] = None
conversation_key: str
conversation_data: Optional[Dict[str, Any]] = None
event_type: str
event_data: Optional[Dict[str, Any]] = None


class ConversationEventCreateResopnse(BaseModel):
id_event: Optional[int] = None
id_conversation: Optional[str] = None

class ConversationMonitorCreate(BaseModel):
"""Data sent to API to create a conversation monitor"""

class ConversationUpdate(BaseModel):
"""Data sent to API to update a conversation"""

conversation_key: str
conversation_id: str
conversation_data: Optional[Dict[str, Any]] = None


class ConversationSchema(BaseModel):
"""Schema for a conversation returned by API"""
class ConversationEventCreate(BaseModel):
"""Data sent to API to create a new conversation event"""

id_conversation: Optional[str] = None
id_workspace: Optional[int] = None
conversation_key: Optional[str] = None
conversation_id: str
conversation_data: Optional[Dict[str, Any]] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
event_type: str
event_data: Optional[Dict[str, Any]] = None
2 changes: 1 addition & 1 deletion phi/aws/app/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class FastApi(AwsApp):

# -*- Image Configuration
image_name: str = "phidata/fastapi"
image_tag: str = "0.96"
image_tag: str = "0.104"
command: Optional[Union[str, List[str]]] = "uvicorn main:app --reload"

# -*- App Ports
Expand Down
2 changes: 1 addition & 1 deletion phi/aws/app/jupyter/jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Jupyter(AwsApp):

# -*- Image Configuration
image_name: str = "phidata/jupyter"
image_tag: str = "3.6.3"
image_tag: str = "4.0.5"
command: Optional[Union[str, List[str]]] = "jupyter lab"

# -*- App Ports
Expand Down
2 changes: 1 addition & 1 deletion phi/aws/app/streamlit/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Streamlit(AwsApp):

# -*- Image Configuration
image_name: str = "phidata/streamlit"
image_tag: str = "1.23"
image_tag: str = "1.27"
command: Optional[Union[str, List[str]]] = "streamlit hello"

# -*- App Ports
Expand Down
4 changes: 4 additions & 0 deletions phi/cli/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def update_signin_url(cls, v, info: FieldValidationInfo):
def update_api_url(cls, v, info: FieldValidationInfo):
api_runtime = info.data["api_runtime"]
if api_runtime == "dev":
from os import getenv

if getenv("PHI_RUNTIME") == "docker":
return "http://host.docker.internal:7070"
return "http://localhost:7070"
elif api_runtime == "stg":
return "https://api.stgphi.com"
Expand Down
1 change: 1 addition & 0 deletions phi/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
WORKSPACES_MOUNT_ENV_VAR: str = "PHI_WORKSPACES_MOUNT"
WORKSPACE_ID_ENV_VAR: str = "PHI_WORKSPACE_ID"
WORKSPACE_HASH_ENV_VAR: str = "PHI_WORKSPACE_HASH"
WORKSPACE_KEY_ENV_VAR: str = "PHI_WORKSPACE_KEY"
WORKSPACE_DIR_ENV_VAR: str = "PHI_WORKSPACE_DIR"
REQUIREMENTS_FILE_PATH_ENV_VAR: str = "REQUIREMENTS_FILE_PATH"

Expand Down
38 changes: 18 additions & 20 deletions phi/conversation/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def start(self) -> Optional[str]:
raise Exception("Failed to create new conversation in storage")
logger.debug(f"-*- Created conversation: {self.database_row.id}")
self.from_database_row(row=self.database_row)
self._api_upsert_conversation()
self._api_log_conversation_monitor()
return self.id

def end(self) -> None:
Expand Down Expand Up @@ -558,7 +558,7 @@ def _chat(self, message: str, stream: bool = True) -> Iterator[str]:
"references": references.model_dump(exclude_none=True) if references else None,
"metrics": self.llm.metrics,
}
self._api_send_conversation_event(event_type="chat", event_data=event_data)
self._api_log_conversation_event(event_type="chat", event_data=event_data)

# -*- Yield final response if not streaming
if not stream:
Expand Down Expand Up @@ -612,7 +612,7 @@ def _chat_raw(
"messages": [m.model_dump(exclude_none=True) for m in messages],
"metrics": self.llm.metrics,
}
self._api_send_conversation_event(event_type="chat_raw", event_data=event_data)
self._api_log_conversation_event(event_type="chat_raw", event_data=event_data)

# -*- Yield final response if not streaming
if not stream:
Expand All @@ -636,8 +636,8 @@ def rename(self, name: str) -> None:
# -*- Save conversation to storage
self.write_to_storage()

# -*- Update conversation
self._api_upsert_conversation()
# -*- Log conversation monitor
self._api_log_conversation_monitor()

def generate_name(self) -> str:
"""Generate a name for the conversation using chat history"""
Expand Down Expand Up @@ -672,52 +672,50 @@ def auto_rename(self) -> None:
# -*- Save conversation to storage
self.write_to_storage()

# -*- Update conversation
self._api_upsert_conversation()
# -*- Log conversation monitor
self._api_log_conversation_monitor()

###########################################################################
# Api functions
###########################################################################

def _api_upsert_conversation(self):
def _api_log_conversation_monitor(self):
if not self.monitoring:
return

from phi.api.conversation import upsert_conversation, ConversationUpdate
from phi.api.conversation import create_conversation_monitor, ConversationMonitorCreate

logger.debug("Sending conversation event")
try:
database_row: ConversationRow = self.database_row or self.to_database_row()
upsert_conversation(
conversation=ConversationUpdate(
conversation_key=database_row.conversation_key(),
create_conversation_monitor(
monitor=ConversationMonitorCreate(
conversation_id=database_row.id,
conversation_data=database_row.conversation_data(),
),
)
except Exception as e:
logger.debug(f"Could not log conversation event: {e}")
logger.debug(f"Could not create conversation monitor: {e}")

def _api_send_conversation_event(
def _api_log_conversation_event(
self, event_type: str = "chat", event_data: Optional[Dict[str, Any]] = None
) -> None:
if not self.monitoring:
return

from phi.api.conversation import log_conversation_event, ConversationEventCreate
from phi.api.conversation import create_conversation_event, ConversationEventCreate

logger.debug("Sending conversation event")
try:
database_row: ConversationRow = self.database_row or self.to_database_row()
log_conversation_event(
create_conversation_event(
conversation=ConversationEventCreate(
conversation_key=database_row.conversation_key(),
conversation_id=database_row.id,
conversation_data=database_row.conversation_data(),
event_type=event_type,
event_data=event_data,
),
)
except Exception as e:
logger.debug(f"Could not log conversation event: {e}")
logger.debug(f"Could not create conversation event: {e}")

###########################################################################
# LLM functions
Expand Down
11 changes: 0 additions & 11 deletions phi/conversation/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,3 @@ def conversation_data(self) -> Dict[str, Any]:
_dict["created_at"] = self.created_at.isoformat() if self.created_at else None
_dict["updated_at"] = self.updated_at.isoformat() if self.updated_at else None
return _dict

def conversation_key(self) -> str:
"""Returns the conversation key."""
keys = []
if self.user_name:
keys.append(self.user_name)
if self.user_type:
keys.append(self.user_type)
if self.id:
keys.append(str(self.id))
return "::".join(keys)
2 changes: 1 addition & 1 deletion phi/docker/app/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class FastApi(DockerApp):

# -*- Image Configuration
image_name: str = "phidata/fastapi"
image_tag: str = "0.96"
image_tag: str = "0.104"
command: Optional[Union[str, List[str]]] = "uvicorn main:app --reload"

# -*- App Ports
Expand Down
2 changes: 1 addition & 1 deletion phi/docker/app/jupyter/jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Jupyter(DockerApp):

# -*- Image Configuration
image_name: str = "phidata/jupyter"
image_tag: str = "3.6.3"
image_tag: str = "4.0.5"
command: Optional[Union[str, List[str]]] = "jupyter lab"

# -*- App Ports
Expand Down
2 changes: 1 addition & 1 deletion phi/docker/app/streamlit/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Streamlit(DockerApp):

# -*- Image Configuration
image_name: str = "phidata/streamlit"
image_tag: str = "1.23"
image_tag: str = "1.27"
command: Optional[Union[str, List[str]]] = "streamlit hello"

# -*- App Ports
Expand Down
2 changes: 1 addition & 1 deletion phi/k8s/app/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FastApi(K8sApp):

# -*- Image Configuration
image_name: str = "phidata/fastapi"
image_tag: str = "0.96"
image_tag: str = "0.104"
command: Optional[Union[str, List[str]]] = "uvicorn main:app --reload"

# -*- App Ports
Expand Down
Loading