Skip to content

Commit

Permalink
v2.0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
ashpreetbedi committed Sep 28, 2023
1 parent d254514 commit 0c5c7b0
Show file tree
Hide file tree
Showing 12 changed files with 672 additions and 123 deletions.
1 change: 0 additions & 1 deletion phi/ai/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def phi_ai_conversation(
stream: bool = False,
) -> None:
"""Start a conversation with Phi AI."""

from phi.ai.phi_ai import PhiAI

conversation_type = ConversationType.AUTO if autonomous_conversation else ConversationType.RAG
Expand Down
224 changes: 199 additions & 25 deletions phi/ai/phi_ai.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import json
from typing import Optional, Dict, List, Any, Iterator

from rich import box
from rich.prompt import Prompt
from rich.live import Live
from rich.table import Table
from rich.markdown import Markdown

from phi.api.ai import conversation_chat
from phi.api.schemas.user import UserSchema
from phi.api.schemas.ai import ConversationType
from phi.cli.config import PhiCliConfig
from phi.cli.console import console
from phi.cli.settings import phi_cli_settings
from phi.llm.schemas import Function
from phi.llm.schemas import Function, Message, FunctionCall
from phi.llm.function.shell import ShellScriptsRegistry
from phi.workspace.config import WorkspaceConfig
from phi.utils.log import logger
from phi.utils.functions import get_function_call
from phi.utils.timer import Timer
from phi.utils.json_io import write_json_file, read_json_file


Expand All @@ -30,9 +40,7 @@ def __init__(
_active_workspace = _phi_config.get_active_ws_config()

self.conversation_db: Optional[List[Dict[str, Any]]] = None
self.functions: Dict[str, Function] = {
"run_shell_command": Function.from_callable(ShellScriptsRegistry.run_shell_command)
}
self.functions: Dict[str, Function] = ShellScriptsRegistry().functions
logger.debug(f"Functions: {self.functions.keys()}")

_conversation_id = None
Expand Down Expand Up @@ -79,30 +87,34 @@ def __init__(
self.save_conversation()
logger.debug(f"--**-- Conversation: {self.conversation_id} --**--")

async def start_conversation(self, stream: bool = False):
from rich import box
from rich.prompt import Prompt
from rich.live import Live
from rich.table import Table
from rich.markdown import Markdown
from phi.api.ai import conversation_chat

def start_conversation(self, stream: bool = False):
conversation_active = True
while conversation_active:
username = self.user.username or "You"
console.rule()
user_message = Prompt.ask(f"[bold] :sunglasses: {username} [/bold]", console=console)
self.conversation_history.append({"role": "user", "content": user_message})
user_message_str_valid = False
while not user_message_str_valid:
user_message_str = Prompt.ask(f"[bold] :sunglasses: {username} [/bold]", console=console)
if (
user_message_str is None
or user_message_str == ""
or user_message_str == "{}"
or len(user_message_str) < 2
):
console.print("Please enter a valid message")
continue
user_message_str_valid = True
self.conversation_history.append({"role": "user", "content": user_message_str})

# -*- Quit conversation
if user_message in ("exit", "quit", "bye"):
if user_message_str in ("exit", "quit", "bye"):
conversation_active = False

# -*- Send message to Phi AI
api_response: Optional[Iterator[str]] = conversation_chat(
user=self.user,
conversation_id=self.conversation_id,
message=user_message,
message=Message(role="user", content=user_message_str),
conversation_type=self.conversation_type,
functions=self.functions,
stream=stream,
Expand All @@ -112,22 +124,126 @@ async def start_conversation(self, stream: bool = False):
conversation_active = False
else:
with Live(console=console) as live:
response_content = ""
if stream:
chat_response = ""
for _response in api_response:
chat_response += _response
table = Table(show_header=False, box=box.ROUNDED)
table.add_row(Markdown(chat_response))
live.update(table)
self.conversation_history.append({"role": "assistant", "content": chat_response})
if _response is None or _response == "" or _response == "{}":
continue
response_dict = json.loads(_response)
if "content" in response_dict and response_dict.get("content") is not None:
response_content += response_dict.get("content")
table = Table(show_header=False, box=box.ROUNDED)
table.add_row(Markdown(response_content))
live.update(table)
elif "function_call" in response_dict:
for function_response in self.run_function_stream(response_dict.get("function_call")):
response_content += function_response
table = Table(show_header=False, box=box.ROUNDED)
table.add_row(Markdown(response_content))
live.update(table)
else:
chat_response = next(api_response)
_response = next(api_response)
if _response is None or _response == "" or _response == "{}":
response_content = "Something went wrong, please try again."
else:
response_dict = json.loads(_response)
if "content" in response_dict and response_dict.get("content") is not None:
response_content = response_dict.get("content")
elif "function_call" in response_dict:
response_content = self.run_function(response_dict.get("function_call"))
table = Table(show_header=False, box=box.ROUNDED)
table.add_row(Markdown(chat_response))
table.add_row(Markdown(response_content))
console.print(table)
self.conversation_history.append({"role": "assistant", "content": chat_response})
self.conversation_history.append({"role": "assistant", "content": response_content})
self.save_conversation()

def run_function_stream(self, function_call: Dict[str, Any]) -> Iterator[str]:
_function_name = function_call.get("name")
_function_arguments_str = function_call.get("arguments")
if _function_name is not None:
function_call_obj: Optional[FunctionCall] = get_function_call(
name=_function_name, arguments=_function_arguments_str, functions=self.functions
)
if function_call_obj is None:
return "Something went wrong, please try again."

# -*- Run function call
yield f"Running: {function_call_obj.get_call_str()}\n\n"
function_call_timer = Timer()
function_call_timer.start()
function_call_obj.run()
function_call_timer.stop()
function_call_message = Message(
role="function",
name=function_call_obj.function.name,
content=function_call_obj.result,
metrics={"time": function_call_timer.elapsed},
)
# -*- Send message to Phi AI
api_response: Optional[Iterator[str]] = conversation_chat(
user=self.user,
conversation_id=self.conversation_id,
message=function_call_message,
conversation_type=self.conversation_type,
functions=self.functions,
stream=True,
)
if api_response is not None:
for _response in api_response:
if _response is None or _response == "" or _response == "{}":
continue
response_dict = json.loads(_response)
if "content" in response_dict and response_dict.get("content") is not None:
yield response_dict.get("content")
elif "function_call" in response_dict:
yield from self.run_function_stream(response_dict.get("function_call"))
else:
yield "Could not run function, please try again."

def run_function(self, function_call: Dict[str, Any]) -> str:
_function_name = function_call.get("name")
_function_arguments_str = function_call.get("arguments")
if _function_name is not None:
function_call_obj: Optional[FunctionCall] = get_function_call(
name=_function_name, arguments=_function_arguments_str, functions=self.functions
)
if function_call_obj is None:
return "Something went wrong, please try again."

# -*- Run function call
function_run_response = f"Running: {function_call_obj.get_call_str()}\n\n"
function_call_timer = Timer()
function_call_timer.start()
function_call_obj.run()
function_call_timer.stop()
function_call_message = Message(
role="function",
name=function_call_obj.function.name,
content=function_call_obj.result,
metrics={"time": function_call_timer.elapsed},
)
# -*- Send message to Phi AI
api_response: Optional[Iterator[str]] = conversation_chat(
user=self.user,
conversation_id=self.conversation_id,
message=function_call_message,
conversation_type=self.conversation_type,
functions=self.functions,
stream=False,
)
if api_response is not None:
_response = next(api_response)
if _response is None or _response == "" or _response == "{}":
function_run_response += "Something went wrong, please try again."
else:
response_dict = json.loads(_response)
if "content" in response_dict and response_dict.get("content") is not None:
function_run_response += response_dict.get("content")
elif "function_call" in response_dict:
function_run_response += self.run_function(response_dict.get("function_call"))
return function_run_response
return "Something went wrong, please try again."

def print_conversation_history(self):
from rich import box
from rich.table import Table
Expand Down Expand Up @@ -191,3 +307,61 @@ def get_latest_conversation(self) -> Optional[Dict[str, Any]]:
if len(conversations) == 0:
return None
return conversations[0]

# async def conversation(self, stream: bool = False):
# from rich import box
# from rich.prompt import Prompt
# from rich.live import Live
# from rich.table import Table
# from rich.markdown import Markdown
# from phi.api.ai import ai_ws_connect
#
# logger.info("Starting conversation with Phi AI")
#
# conversation_active = True
# username = self.user.username or "You"
# async with ai_ws_connect(
# user=self.user,
# conversation_id=self.conversation_id,
# conversation_type=self.conversation_type,
# stream=stream,
# ) as ai_ws:
# while conversation_active:
# console.rule()
# user_message = Prompt.ask(f"[bold] :sunglasses: {username} [/bold]", console=console)
# self.conversation_history.append({"role": "user", "content": user_message})
#
# # -*- Quit conversation
# if user_message in ("exit", "quit", "bye"):
# conversation_active = False
#
# # -*- Send message to Phi AI
# await ai_ws.send(user_message)
# with Live(console=console) as live:
# if stream:
# chat_response = ""
# ai_response_chunk = await ai_ws.recv()
# while ai_response_chunk is not None and ai_response_chunk != "AI_RESPONSE_STOP_STREAM":
# chat_response += ai_response_chunk
# table = Table(show_header=False, box=box.ROUNDED)
# table.add_row(Markdown(chat_response))
# live.update(table)
# ai_response_chunk = await ai_ws.recv()
# if ai_response_chunk is None or ai_response_chunk == "AI_RESPONSE_STOP_STREAM":
# break
# if ai_response_chunk.startswith("{"):
# await ai_ws.send("function_result:one")
# self.conversation_history.append({"role": "assistant", "content": chat_response})
# else:
# ai_response = await ai_ws.recv()
# logger.info(f"ai_response: {type(ai_response)} | {ai_response}")
#
# if ai_response is None:
# logger.error("Could not reach Phi AI")
# conversation_active = False
# chat_response = ai_response
# table = Table(show_header=False, box=box.ROUNDED)
# table.add_row(Markdown(chat_response))
# console.print(table)
# self.conversation_history.append({"role": "assistant", "content": chat_response})
# self.save_conversation()
Loading

0 comments on commit 0c5c7b0

Please sign in to comment.