diff --git a/.gitignore b/.gitignore index 8e1e1380d..f1fd60989 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,6 @@ data.db .ipynb_checkpoints +audio_generations + *.db diff --git a/cookbook/agents/37_audio_input_output.py b/cookbook/agents/37_audio_input_output.py index c10791584..f7e91f65e 100644 --- a/cookbook/agents/37_audio_input_output.py +++ b/cookbook/agents/37_audio_input_output.py @@ -2,6 +2,7 @@ import requests from phi.agent import Agent from phi.model.openai import OpenAIChat +from phi.utils.audio import write_audio_to_file # Fetch the audio file and convert it to a base64 encoded string url = "https://openaiassets.blob.core.windows.net/$web/API/docs/audio/alloy.wav" @@ -22,7 +23,5 @@ audio={"data": encoded_string, "format": "wav"}, ) -if agent.run_response.audio is not None and "data" in agent.run_response.audio: - wav_bytes = base64.b64decode(agent.run_response.audio["data"]) - with open("dog.wav", "wb") as f: - f.write(wav_bytes) +if agent.run_response.response_audio is not None and "data" in agent.run_response.response_audio: + write_audio_to_file(audio=agent.run_response.response_audio["data"], filename="tmp/dog.wav") diff --git a/cookbook/agents/38_audio_multi_turn.py b/cookbook/agents/38_audio_multi_turn.py index 0cb4a6553..92e0fbfd1 100644 --- a/cookbook/agents/38_audio_multi_turn.py +++ b/cookbook/agents/38_audio_multi_turn.py @@ -1,22 +1,19 @@ -import base64 from phi.agent import Agent from phi.model.openai import OpenAIChat +from phi.utils.audio import write_audio_to_file agent = Agent( model=OpenAIChat( id="gpt-4o-audio-preview", modalities=["text", "audio"], audio={"voice": "alloy", "format": "wav"} ), + debug_mode=True, add_history_to_messages=True, ) agent.run("Is a golden retriever a good family dog?") -if agent.run_response.audio is not None and "data" in agent.run_response.audio: - wav_bytes = base64.b64decode(agent.run_response.audio["data"]) - with open("tmp/answer_1.wav", "wb") as f: - f.write(wav_bytes) +if agent.run_response.response_audio is not None and "data" in agent.run_response.response_audio: + write_audio_to_file(audio=agent.run_response.response_audio["data"], filename="tmp/answer_1.wav") agent.run("Why do you say they are loyal?") -if agent.run_response.audio is not None and "data" in agent.run_response.audio: - wav_bytes = base64.b64decode(agent.run_response.audio["data"]) - with open("tmp/answer_2.wav", "wb") as f: - f.write(wav_bytes) +if agent.run_response.response_audio is not None and "data" in agent.run_response.response_audio: + write_audio_to_file(audio=agent.run_response.response_audio["data"], filename="tmp/answer_2.wav") diff --git a/cookbook/agents/42_image_to_audio.py b/cookbook/agents/42_image_to_audio.py index 8519f097b..8cbe4f11b 100644 --- a/cookbook/agents/42_image_to_audio.py +++ b/cookbook/agents/42_image_to_audio.py @@ -1,10 +1,10 @@ -import base64 from pathlib import Path from rich import print from rich.text import Text from phi.agent import Agent, RunResponse from phi.model.openai import OpenAIChat +from phi.utils.audio import write_audio_to_file cwd = Path(__file__).parent.resolve() @@ -23,7 +23,5 @@ ) audio_story: RunResponse = audio_agent.run(f"Narrate the story with flair: {image_story.content}") -if audio_story.audio is not None and "data" in audio_story.audio: - wav_bytes = base64.b64decode(audio_story.audio["data"]) - with open(cwd.joinpath("tmp/multimodal-agents.wav"), "wb") as f: - f.write(wav_bytes) +if audio_story.response_audio is not None and "data" in audio_story.response_audio: + write_audio_to_file(audio=audio_story.response_audio["data"], filename="tmp/multimodal-agents.wav") diff --git a/cookbook/embedders/cohere_embedder.py b/cookbook/embedders/cohere_embedder.py new file mode 100644 index 000000000..be36af603 --- /dev/null +++ b/cookbook/embedders/cohere_embedder.py @@ -0,0 +1,18 @@ +from phi.agent import AgentKnowledge +from phi.vectordb.pgvector import PgVector +from phi.embedder.cohere import CohereEmbedder + +embeddings = CohereEmbedder().get_embedding("The quick brown fox jumps over the lazy dog.") +# Print the embeddings and their dimensions +print(f"Embeddings: {embeddings[:5]}") +print(f"Dimensions: {len(embeddings)}") + +# Example usage: +knowledge_base = AgentKnowledge( + vector_db=PgVector( + db_url="postgresql+psycopg://ai:ai@localhost:5532/ai", + table_name="cohere_embeddings", + embedder=CohereEmbedder(), + ), + num_documents=2, +) diff --git a/cookbook/examples/agents/study_partner.py b/cookbook/examples/agents/04_study_partner.py similarity index 100% rename from cookbook/examples/agents/study_partner.py rename to cookbook/examples/agents/04_study_partner.py diff --git a/cookbook/examples/agents/05_shopping_partner.py b/cookbook/examples/agents/05_shopping_partner.py new file mode 100644 index 000000000..9803baec6 --- /dev/null +++ b/cookbook/examples/agents/05_shopping_partner.py @@ -0,0 +1,21 @@ +from phi.agent import Agent +from phi.model.openai import OpenAIChat +from phi.tools.firecrawl import FirecrawlTools + +agent = Agent( + name="shopping partner", + model=OpenAIChat(id="gpt-4o"), + instructions=[ + "You are a product recommender agent specializing in finding products that match user preferences.", + "Prioritize finding products that satisfy as many user requirements as possible, but ensure a minimum match of 50%.", + "Search for products only from authentic and trusted e-commerce websites such as Amazon, Flipkart, Myntra, Meesho, Google Shopping, Nike, and other reputable platforms.", + "Verify that each product recommendation is in stock and available for purchase.", + "Avoid suggesting counterfeit or unverified products.", + "Clearly mention the key attributes of each product (e.g., price, brand, features) in the response.", + "Format the recommendations neatly and ensure clarity for ease of user understanding.", + ], + tools=[FirecrawlTools()], +) +agent.print_response( + "I am looking for running shoes with the following preferences: Color: Black Purpose: Comfortable for long-distance running Budget: Under Rs. 10,000" +) diff --git a/cookbook/examples/agents/06_book_recommendation.py b/cookbook/examples/agents/06_book_recommendation.py new file mode 100644 index 000000000..0ddb6af8a --- /dev/null +++ b/cookbook/examples/agents/06_book_recommendation.py @@ -0,0 +1,24 @@ +from phi.agent import Agent +from phi.model.openai import OpenAIChat +from phi.tools.exa import ExaTools + +agent = Agent( + description="you help user with book recommendations", + name="Shelfie", + model=OpenAIChat(id="gpt-4o"), + instructions=[ + "You are a highly knowledgeable book recommendation agent.", + "Your goal is to help the user discover books based on their preferences, reading history, and interests.", + "If the user mentions a specific genre, suggest books that span both classics and modern hits.", + "When the user mentions an author, recommend similar authors or series they may enjoy.", + "Highlight notable accomplishments of the book, such as awards, best-seller status, or critical acclaim.", + "Provide a short summary or teaser for each book recommended.", + "Offer up to 5 book recommendations for each request, ensuring they are diverse and relevant.", + "Leverage online resources like Goodreads, StoryGraph, and LibraryThing for accurate and varied suggestions.", + "Focus on being concise, relevant, and thoughtful in your recommendations.", + ], + tools=[ExaTools()], +) +agent.print_response( + "I really found anxious people and lessons in chemistry interesting, can you suggest me more such books" +) diff --git a/cookbook/examples/agents/07_weekend_planner.py b/cookbook/examples/agents/07_weekend_planner.py new file mode 100644 index 000000000..47fa21cc1 --- /dev/null +++ b/cookbook/examples/agents/07_weekend_planner.py @@ -0,0 +1,24 @@ +from phi.agent import Agent +from phi.model.openai import OpenAIChat +from phi.tools.exa import ExaTools + +agent = Agent( + description="you help the user plan their weekends", + name="TimeOut", + model=OpenAIChat(id="gpt-4o"), + instructions=[ + "You are a weekend planning assistant that helps users create a personalized weekend itinerary.", + "Always mention the timeframe, location, and year provided by the user (e.g., '16–17 December 2023 in Bangalore'). Recommendations should align with the specified dates.", + "Provide responses in these sections: Events, Activities, Dining Options.", + "- **Events**: Include name, date, time, location, a brief description, and booking links from platforms like BookMyShow or Insider.in.", + "- **Activities**: Suggest engaging options with estimated time required, location, and additional tips (e.g., best time to visit).", + "- **Dining Options**: Recommend restaurants or cafés with cuisine highlights and links to platforms like Zomato or Google Maps.", + "Ensure all recommendations are for the current or future dates relevant to the query. Avoid past events.", + "If no specific data is available for the dates, suggest general activities or evergreen attractions in the city.", + "Keep responses concise, clear, and formatted for easy reading.", + ], + tools=[ExaTools()], +) +agent.print_response( + "I want to plan my coming weekend filled with fun activities and christmas themed activities in Bangalore for 21 and 22 Dec 2024." +) diff --git a/cookbook/playground/gemini_agents.py b/cookbook/playground/gemini_agents.py new file mode 100644 index 000000000..c8eded2a6 --- /dev/null +++ b/cookbook/playground/gemini_agents.py @@ -0,0 +1,16 @@ +from phi.agent import Agent +from phi.tools.yfinance import YFinanceTools +from phi.playground import Playground, serve_playground_app +from phi.model.google import Gemini + +finance_agent = Agent( + name="Finance Agent", + model=Gemini(id="gemini-2.0-flash-exp"), + tools=[YFinanceTools(stock_price=True)], + debug_mode=True, +) + +app = Playground(agents=[finance_agent]).get_app(use_async=False) + +if __name__ == "__main__": + serve_playground_app("gemini_agents:app", reload=True) diff --git a/cookbook/playground/multimodal_agent.py b/cookbook/playground/multimodal_agent.py index d9e8996d7..412168ec5 100644 --- a/cookbook/playground/multimodal_agent.py +++ b/cookbook/playground/multimodal_agent.py @@ -9,6 +9,7 @@ from phi.agent import Agent from phi.model.openai import OpenAIChat from phi.tools.dalle import Dalle +from phi.tools.eleven_labs_tools import ElevenLabsTools from phi.tools.giphy import GiphyTools from phi.tools.models_labs import ModelsLabs from phi.model.response import FileType @@ -88,6 +89,7 @@ gif_agent = Agent( name="Gif Generator Agent", + agent_id="gif_agent", model=OpenAIChat(id="gpt-4o"), tools=[GiphyTools()], description="You are an AI agent that can generate gifs using Giphy.", @@ -102,8 +104,34 @@ storage=SqlAgentStorage(table_name="gif_agent", db_file=image_agent_storage_file), ) +audio_agent = Agent( + name="Audio Generator Agent", + agent_id="audio_agent", + model=OpenAIChat(id="gpt-4o"), + tools=[ + ElevenLabsTools( + voice_id="JBFqnCBsd6RMkjVDRZzb", model_id="eleven_multilingual_v2", target_directory="audio_generations" + ) + ], + description="You are an AI agent that can generate audio using the ElevenLabs API.", + instructions=[ + "When the user asks you to generate audio, use the `text_to_speech` tool to generate the audio.", + "You'll generate the appropriate prompt to send to the tool to generate audio.", + "You don't need to find the appropriate voice first, I already specified the voice to user." + "Don't return file name or file url in your response or markdown just tell the audio was created successfully.", + "The audio should be long and detailed.", + ], + markdown=True, + debug_mode=True, + add_history_to_messages=True, + add_datetime_to_instructions=True, + storage=SqlAgentStorage(table_name="audio_agent", db_file=image_agent_storage_file), +) -app = Playground(agents=[image_agent, ml_gif_agent, ml_video_agent, fal_agent, gif_agent]).get_app(use_async=False) + +app = Playground(agents=[image_agent, ml_gif_agent, ml_video_agent, fal_agent, gif_agent, audio_agent]).get_app( + use_async=False +) if __name__ == "__main__": serve_playground_app("multimodal_agent:app", reload=True) diff --git a/cookbook/providers/google/flash_thinking.py b/cookbook/providers/google/flash_thinking.py new file mode 100644 index 000000000..0e2514f7a --- /dev/null +++ b/cookbook/providers/google/flash_thinking.py @@ -0,0 +1,12 @@ +from phi.agent import Agent +from phi.model.google import Gemini + +task = ( + "Three missionaries and three cannibals need to cross a river. " + "They have a boat that can carry up to two people at a time. " + "If, at any time, the cannibals outnumber the missionaries on either side of the river, the cannibals will eat the missionaries. " + "How can all six people get across the river safely? Provide a step-by-step solution and show the solutions as an ascii diagram" +) + +agent = Agent(model=Gemini(id="gemini-2.0-flash-thinking-exp-1219"), markdown=True) +agent.print_response(task, stream=True) diff --git a/cookbook/providers/ollama/agent_stream.py b/cookbook/providers/ollama/agent_stream.py index 13c8e1060..d39e0b46e 100644 --- a/cookbook/providers/ollama/agent_stream.py +++ b/cookbook/providers/ollama/agent_stream.py @@ -3,6 +3,7 @@ from typing import Iterator # noqa from phi.agent import Agent, RunResponse # noqa from phi.model.ollama import Ollama +from phi.tools.crawl4ai_tools import Crawl4aiTools from phi.tools.yfinance import YFinanceTools agent = Agent( @@ -20,3 +21,10 @@ # Print the response in the terminal agent.print_response("What are analyst recommendations for NVDA and TSLA", stream=True) + + +agent = Agent(model=Ollama(id="llama3.1:8b"), tools=[Crawl4aiTools(max_length=1000)], show_tool_calls=True) +agent.print_response( + "Summarize me the key points in bullet points of this: https://blog.google/products/gemini/google-gemini-deep-research/", + stream=True, +) diff --git a/cookbook/storage/json_storage.py b/cookbook/storage/json_storage.py new file mode 100644 index 000000000..67509dd4e --- /dev/null +++ b/cookbook/storage/json_storage.py @@ -0,0 +1,13 @@ +"""Run `pip install duckduckgo-search openai` to install dependencies.""" + +from phi.agent import Agent +from phi.tools.duckduckgo import DuckDuckGo +from phi.storage.agent.json import JsonFileAgentStorage + +agent = Agent( + storage=JsonFileAgentStorage(dir_path="tmp/agent_sessions_json"), + tools=[DuckDuckGo()], + add_history_to_messages=True, +) +agent.print_response("How many people live in Canada?") +agent.print_response("What is their national anthem called?") diff --git a/cookbook/storage/yaml_storage.py b/cookbook/storage/yaml_storage.py new file mode 100644 index 000000000..70e894680 --- /dev/null +++ b/cookbook/storage/yaml_storage.py @@ -0,0 +1,13 @@ +"""Run `pip install duckduckgo-search openai` to install dependencies.""" + +from phi.agent import Agent +from phi.tools.duckduckgo import DuckDuckGo +from phi.storage.agent.yaml import YamlFileAgentStorage + +agent = Agent( + storage=YamlFileAgentStorage(dir_path="tmp/agent_sessions_yaml"), + tools=[DuckDuckGo()], + add_history_to_messages=True, +) +agent.print_response("How many people live in Canada?") +agent.print_response("What is their national anthem called?") diff --git a/cookbook/tools/confluence_tools.py b/cookbook/tools/confluence_tools.py new file mode 100644 index 000000000..6e989909d --- /dev/null +++ b/cookbook/tools/confluence_tools.py @@ -0,0 +1,22 @@ +from phi.agent import Agent +from phi.tools.confluence import ConfluenceTools + + +agent = Agent( + name="Confluence agent", + tools=[ConfluenceTools()], + show_tool_calls=True, + markdown=True, +) + +## getting space details +agent.print_response("How many spaces are there and what are their names?") + +## getting page_content +agent.print_response("What is the content present in page 'Large language model in LLM space'") + +## getting page details in a particular space +agent.print_response("Can you extract all the page names from 'LLM' space") + +## creating a new page in a space +agent.print_response("Can you create a new page named 'TESTING' in 'LLM' space") diff --git a/cookbook/tools/elevenlabs_tools.py b/cookbook/tools/elevenlabs_tools.py new file mode 100644 index 000000000..a1c3d711e --- /dev/null +++ b/cookbook/tools/elevenlabs_tools.py @@ -0,0 +1,32 @@ +""" +pip install elevenlabs +""" + +from phi.agent import Agent +from phi.model.openai import OpenAIChat +from phi.tools.eleven_labs_tools import ElevenLabsTools + +audio_agent = Agent( + model=OpenAIChat(id="gpt-4o"), + tools=[ + ElevenLabsTools( + voice_id="21m00Tcm4TlvDq8ikWAM", model_id="eleven_multilingual_v2", target_directory="audio_generations" + ) + ], + description="You are an AI agent that can generate audio using the ElevenLabs API.", + instructions=[ + "When the user asks you to generate audio, use the `generate_audio` tool to generate the audio.", + "You'll generate the appropriate prompt to send to the tool to generate audio.", + "You don't need to find the appropriate voice first, I already specified the voice to user." + "Return the audio file name in your response. Don't convert it to markdown.", + "The audio should be long and detailed.", + ], + markdown=True, + debug_mode=True, + show_tool_calls=True, +) + +audio_agent.print_response("Generate a very long audio of history of french revolution") + + +audio_agent.print_response("Generate a kick sound effect") diff --git a/cookbook/workflows/startup_idea_validator.py b/cookbook/workflows/startup_idea_validator.py new file mode 100644 index 000000000..c4070f622 --- /dev/null +++ b/cookbook/workflows/startup_idea_validator.py @@ -0,0 +1,213 @@ +""" +1. Install dependencies using: `pip install openai exa_py sqlalchemy phidata` +2. Run the script using: `python cookbook/workflows/blog_post_generator.py` +""" + +import json +from typing import Optional, Iterator + +from pydantic import BaseModel, Field + +from phi.agent import Agent +from phi.model.openai import OpenAIChat +from phi.tools.googlesearch import GoogleSearch +from phi.workflow import Workflow, RunResponse, RunEvent +from phi.storage.workflow.sqlite import SqlWorkflowStorage +from phi.utils.pprint import pprint_run_response +from phi.utils.log import logger + + +class IdeaClarification(BaseModel): + originality: str = Field(..., description="Originality of the idea.") + mission: str = Field(..., description="Mission of the company.") + objectives: str = Field(..., description="Objectives of the company.") + + +class MarketResearch(BaseModel): + total_addressable_market: str = Field(..., description="Total addressable market (TAM).") + serviceable_available_market: str = Field(..., description="Serviceable available market (SAM).") + serviceable_obtainable_market: str = Field(..., description="Serviceable obtainable market (SOM).") + target_customer_segments: str = Field(..., description="Target customer segments.") + + +class StartupIdeaValidator(Workflow): + idea_clarifier_agent: Agent = Agent( + model=OpenAIChat(id="gpt-4o-mini"), + instructions=[ + "Given a user's startup idea, its your goal to refine that idea. ", + "Evaluates the originality of the idea by comparing it with existing concepts. ", + "Define the mission and objectives of the startup.", + ], + add_history_to_messages=True, + add_datetime_to_instructions=True, + response_model=IdeaClarification, + structured_outputs=True, + debug_mode=False, + ) + + market_research_agent: Agent = Agent( + model=OpenAIChat(id="gpt-4o-mini"), + tools=[GoogleSearch()], + instructions=[ + "You are provided with a startup idea and the company's mission and objectives. ", + "Estimate the total addressable market (TAM), serviceable available market (SAM), and serviceable obtainable market (SOM). ", + "Define target customer segments and their characteristics. ", + "Search the web for resources if you need to.", + ], + add_history_to_messages=True, + add_datetime_to_instructions=True, + response_model=MarketResearch, + structured_outputs=True, + debug_mode=False, + ) + + competitor_analysis_agent: Agent = Agent( + model=OpenAIChat(id="gpt-4o-mini"), + tools=[GoogleSearch()], + instructions=[ + "You are provided with a startup idea and some market research related to the idea. ", + "Identify existing competitors in the market. ", + "Perform Strengths, Weaknesses, Opportunities, and Threats (SWOT) analysis for each competitor. ", + "Assess the startup’s potential positioning relative to competitors.", + ], + add_history_to_messages=True, + add_datetime_to_instructions=True, + markdown=True, + debug_mode=False, + ) + + report_agent: Agent = Agent( + model=OpenAIChat(id="gpt-4o-mini"), + instructions=[ + "You are provided with a startup idea and other data about the idea. ", + "Summarise everything into a single report.", + ], + add_history_to_messages=True, + add_datetime_to_instructions=True, + markdown=True, + debug_mode=False, + ) + + def get_idea_clarification(self, startup_idea: str) -> Optional[IdeaClarification]: + try: + response: RunResponse = self.idea_clarifier_agent.run(startup_idea) + + # Check if we got a valid response + if not response or not response.content: + logger.warning("Empty Idea Clarification response") + # Check if the response is of the expected type + if not isinstance(response.content, IdeaClarification): + logger.warning("Invalid response type") + + return response.content + + except Exception as e: + logger.warning(f"Failed: {str(e)}") + + return None + + def get_market_research(self, startup_idea: str, idea_clarification: IdeaClarification) -> Optional[MarketResearch]: + agent_input = {"startup_idea": startup_idea, **idea_clarification.model_dump()} + + try: + response: RunResponse = self.market_research_agent.run(json.dumps(agent_input, indent=4)) + + # Check if we got a valid response + if not response or not response.content: + logger.warning("Empty Market Research response") + + # Check if the response is of the expected type + if not isinstance(response.content, MarketResearch): + logger.warning("Invalid response type") + + return response.content + + except Exception as e: + logger.warning(f"Failed: {str(e)}") + + return None + + def get_competitor_analysis(self, startup_idea: str, market_research: MarketResearch) -> Optional[str]: + agent_input = {"startup_idea": startup_idea, **market_research.model_dump()} + + try: + response: RunResponse = self.competitor_analysis_agent.run(json.dumps(agent_input, indent=4)) + + # Check if we got a valid response + if not response or not response.content: + logger.warning("Empty Competitor Analysis response") + + return response.content + + except Exception as e: + logger.warning(f"Failed: {str(e)}") + + return None + + def run(self, startup_idea: str) -> Iterator[RunResponse]: + logger.info(f"Generating a startup validation report for: {startup_idea}") + + # Clarify and quantify the idea + idea_clarification: Optional[IdeaClarification] = self.get_idea_clarification(startup_idea) + + if idea_clarification is None: + yield RunResponse( + event=RunEvent.workflow_completed, + content=f"Sorry, could not even clarify the idea: {startup_idea}", + ) + return + + # Do some market research + market_research: Optional[MarketResearch] = self.get_market_research(startup_idea, idea_clarification) + + if market_research is None: + yield RunResponse( + event=RunEvent.workflow_completed, + content="Market research failed", + ) + return + + competitor_analysis: Optional[str] = self.get_competitor_analysis(startup_idea, market_research) + + # Compile the final report + final_response: RunResponse = self.report_agent.run( + json.dumps( + { + "startup_idea": startup_idea, + **idea_clarification.model_dump(), + **market_research.model_dump(), + "competitor_analysis_report": competitor_analysis, + }, + indent=4, + ) + ) + + yield RunResponse(content=final_response.content, event=RunEvent.workflow_completed) + + +# Run the workflow if the script is executed directly +if __name__ == "__main__": + from rich.prompt import Prompt + + # Get idea from user + idea = Prompt.ask( + "[bold]What is your startup idea?[/bold]\n✨", + default="A marketplace for Christmas Ornaments made from leather", + ) + + # Convert the idea to a URL-safe string for use in session_id + url_safe_idea = idea.lower().replace(" ", "-") + + startup_idea_validator = StartupIdeaValidator( + description="Startup Idea Validator", + session_id=f"validate-startup-idea-{url_safe_idea}", + storage=SqlWorkflowStorage( + table_name="validate_startup_ideas_workflow", + db_file="tmp/workflows.db", + ), + debug_mode=True + ) + + final_report: Iterator[RunResponse] = startup_idea_validator.run(startup_idea=idea) + + pprint_run_response(final_report, markdown=True) diff --git a/phi/agent/agent.py b/phi/agent/agent.py index dfb4d12bb..50f2220c3 100644 --- a/phi/agent/agent.py +++ b/phi/agent/agent.py @@ -28,7 +28,7 @@ from phi.document import Document from phi.agent.session import AgentSession -from phi.model.content import Image, Video +from phi.model.content import Image, Video, Audio from phi.reasoning.step import ReasoningStep, ReasoningSteps, NextAction from phi.run.response import RunEvent, RunResponse, RunResponseExtraData from phi.knowledge.agent import AgentKnowledge @@ -61,6 +61,8 @@ class Agent(BaseModel): images: Optional[List[Image]] = None # Videos associated with this agent videos: Optional[List[Video]] = None + # Audio associated with this agent + audio: Optional[List[Audio]] = None # Data associated with this agent # name, model, images and videos are automatically added to the agent_data @@ -577,6 +579,8 @@ def get_agent_data(self) -> Dict[str, Any]: agent_data["images"] = [img if isinstance(img, dict) else img.model_dump() for img in self.images] if self.videos is not None: agent_data["videos"] = [vid if isinstance(vid, dict) else vid.model_dump() for vid in self.videos] + if self.audio is not None: + agent_data["audio"] = [aud if isinstance(aud, dict) else aud.model_dump() for aud in self.audio] return agent_data def get_session_data(self) -> Dict[str, Any]: @@ -641,6 +645,12 @@ def from_agent_session(self, session: AgentSession): self.videos.extend([Video.model_validate(vid) for vid in self.videos]) else: self.videos = videos_from_db + if "audio" in session.agent_data: + audio_from_db = session.agent_data.get("audio") + if self.audio is not None and isinstance(self.audio, list): + self.audio.extend([Audio.model_validate(aud) for aud in self.audio]) + else: + self.audio = audio_from_db # If agent_data is set in the agent, update the database agent_data with the agent's agent_data if self.agent_data is not None: @@ -1706,8 +1716,10 @@ def generic_run_response( agent_id=self.agent_id, content=content, tools=self.run_response.tools, + audio=self.run_response.audio, images=self.run_response.images, videos=self.run_response.videos, + response_audio=self.run_response.response_audio, model=self.run_response.model, messages=self.run_response.messages, extra_data=self.run_response.extra_data, @@ -1798,6 +1810,7 @@ def _run( self.run_response.content = model_response_chunk.content self.run_response.created_at = model_response_chunk.created_at yield self.run_response + elif model_response_chunk.event == ModelResponseEvent.tool_call_started.value: # Add tool call to the run_response tool_call_dict = model_response_chunk.tool_call @@ -1834,7 +1847,7 @@ def _run( else: self.run_response.content = model_response.content if model_response.audio is not None: - self.run_response.audio = model_response.audio + self.run_response.response_audio = model_response.audio self.run_response.messages = messages_for_model self.run_response.created_at = model_response.created_at @@ -1848,6 +1861,8 @@ def _run( # Update the run_response content if streaming as run_response will only contain the last chunk if self.stream: self.run_response.content = model_response.content + if model_response.audio is not None: + self.run_response.response_audio = model_response.audio # 6. Update Memory if self.stream_intermediate_steps: @@ -2186,6 +2201,8 @@ async def _arun( # Update the run_response content if streaming as run_response will only contain the last chunk if self.stream: self.run_response.content = model_response.content + if model_response.audio is not None: + self.run_response.response_audio = model_response.audio # 6. Update Memory if self.stream_intermediate_steps: @@ -2469,12 +2486,24 @@ def add_video(self, video: Video) -> None: self.run_response.videos = [] self.run_response.videos.append(video) + def add_audio(self, audio: Audio) -> None: + if self.audio is None: + self.audio = [] + self.audio.append(audio) + if self.run_response is not None: + if self.run_response.audio is None: + self.run_response.audio = [] + self.run_response.audio.append(audio) + def get_images(self) -> Optional[List[Image]]: return self.images def get_videos(self) -> Optional[List[Video]]: return self.videos + def get_audio(self) -> Optional[List[Audio]]: + return self.audio + ########################################################################### # Default Tools ########################################################################### diff --git a/phi/agent/session.py b/phi/agent/session.py index ee50c6c3e..3d2e8254f 100644 --- a/phi/agent/session.py +++ b/phi/agent/session.py @@ -27,17 +27,25 @@ class AgentSession(BaseModel): model_config = ConfigDict(from_attributes=True) def monitoring_data(self) -> Dict[str, Any]: - monitoring_data = self.model_dump(exclude={"memory"}) # Google Gemini adds a "parts" field to the messages, which is not serializable - # If there are runs in the memory, remove the "parts" from the messages - if self.memory is not None and "runs" in self.memory: - _runs = self.memory["runs"] - if len(_runs) > 0: - for _run in _runs: - if "messages" in _run: - for m in _run["messages"]: - if isinstance(m, dict): - m.pop("parts", None) + # If the provider is Google, remove the "parts" from the messages + if self.agent_data is not None: + if self.agent_data.get("model", {}).get("provider") == "Google" and self.memory is not None: + # Remove parts from runs' response messages + if "runs" in self.memory: + for _run in self.memory["runs"]: + if "response" in _run and "messages" in _run["response"]: + for m in _run["response"]["messages"]: + if isinstance(m, dict): + m.pop("parts", None) + + # Remove parts from top-level memory messages + if "messages" in self.memory: + for m in self.memory["messages"]: + if isinstance(m, dict): + m.pop("parts", None) + + monitoring_data = self.model_dump() return monitoring_data def telemetry_data(self) -> Dict[str, Any]: diff --git a/phi/document/chunking/recursive.py b/phi/document/chunking/recursive.py index 662a9218c..47c552294 100644 --- a/phi/document/chunking/recursive.py +++ b/phi/document/chunking/recursive.py @@ -38,6 +38,7 @@ def chunk(self, document: Document) -> List[Document]: chunk_id = None if document.id: chunk_id = f"{document.id}_{chunk_number}" + chunk_number += 1 meta_data["chunk_size"] = len(chunk) chunks.append(Document(id=chunk_id, name=document.name, meta_data=meta_data, content=chunk)) diff --git a/phi/embedder/cohere.py b/phi/embedder/cohere.py new file mode 100644 index 000000000..6b3b3594c --- /dev/null +++ b/phi/embedder/cohere.py @@ -0,0 +1,70 @@ +from typing import Optional, Dict, List, Tuple, Any, Union + +from phi.embedder.base import Embedder +from phi.utils.log import logger + +try: + from cohere import Client as CohereClient + from cohere.types.embed_response import EmbeddingsFloatsEmbedResponse, EmbeddingsByTypeEmbedResponse +except ImportError: + raise ImportError("`cohere` not installed. Please install using `pip install cohere`.") + + +class CohereEmbedder(Embedder): + model: str = "embed-english-v3.0" + input_type: str = "search_query" + embedding_types: Optional[List[str]] = None + api_key: Optional[str] = None + request_params: Optional[Dict[str, Any]] = None + client_params: Optional[Dict[str, Any]] = None + cohere_client: Optional[CohereClient] = None + + @property + def client(self) -> CohereClient: + if self.cohere_client: + return self.cohere_client + client_params: Dict[str, Any] = {} + if self.api_key: + client_params["api_key"] = self.api_key + return CohereClient(**client_params) + + def response(self, text: str) -> Union[EmbeddingsFloatsEmbedResponse, EmbeddingsByTypeEmbedResponse]: + request_params: Dict[str, Any] = {} + + if self.model: + request_params["model"] = self.model + if self.input_type: + request_params["input_type"] = self.input_type + if self.embedding_types: + request_params["embedding_types"] = self.embedding_types + if self.request_params: + request_params.update(self.request_params) + return self.client.embed(texts=[text], **request_params) + + def get_embedding(self, text: str) -> List[float]: + response: Union[EmbeddingsFloatsEmbedResponse, EmbeddingsByTypeEmbedResponse] = self.response(text=text) + try: + if isinstance(response, EmbeddingsFloatsEmbedResponse): + return response.embeddings[0] + elif isinstance(response, EmbeddingsByTypeEmbedResponse): + return response.embeddings.float_[0] if response.embeddings.float_ else [] + else: + logger.warning("No embeddings found") + return [] + except Exception as e: + logger.warning(e) + return [] + + def get_embedding_and_usage(self, text: str) -> Tuple[List[float], Optional[Dict[str, Any]]]: + response: Union[EmbeddingsFloatsEmbedResponse, EmbeddingsByTypeEmbedResponse] = self.response(text=text) + + embedding: List[float] = [] + if isinstance(response, EmbeddingsFloatsEmbedResponse): + embedding = response.embeddings[0] + elif isinstance(response, EmbeddingsByTypeEmbedResponse): + embedding = response.embeddings.float_[0] if response.embeddings.float_ else [] + + usage = response.meta.billed_units if response.meta else None + if usage: + return embedding, usage.model_dump() + return embedding, None diff --git a/phi/embedder/openai.py b/phi/embedder/openai.py index dc223c121..db1d27a40 100644 --- a/phi/embedder/openai.py +++ b/phi/embedder/openai.py @@ -39,7 +39,7 @@ def client(self) -> OpenAIClient: _client_params.update(self.client_params) return OpenAIClient(**_client_params) - def _response(self, text: str) -> CreateEmbeddingResponse: + def response(self, text: str) -> CreateEmbeddingResponse: _request_params: Dict[str, Any] = { "input": text, "model": self.model, @@ -54,7 +54,7 @@ def _response(self, text: str) -> CreateEmbeddingResponse: return self.client.embeddings.create(**_request_params) def get_embedding(self, text: str) -> List[float]: - response: CreateEmbeddingResponse = self._response(text=text) + response: CreateEmbeddingResponse = self.response(text=text) try: return response.data[0].embedding except Exception as e: @@ -62,7 +62,7 @@ def get_embedding(self, text: str) -> List[float]: return [] def get_embedding_and_usage(self, text: str) -> Tuple[List[float], Optional[Dict]]: - response: CreateEmbeddingResponse = self._response(text=text) + response: CreateEmbeddingResponse = self.response(text=text) embedding = response.data[0].embedding usage = response.usage diff --git a/phi/memory/agent.py b/phi/memory/agent.py index 6bfd6c185..5f3a7dea1 100644 --- a/phi/memory/agent.py +++ b/phi/memory/agent.py @@ -1,5 +1,6 @@ from enum import Enum from typing import Dict, List, Any, Optional, Tuple +from copy import deepcopy from pydantic import BaseModel, ConfigDict @@ -357,8 +358,22 @@ def clear(self) -> None: self.summary = None self.memories = None - def deep_copy(self, *, update: Optional[Dict[str, Any]] = None) -> "AgentMemory": - new_memory = self.model_copy(deep=True, update=update) - # clear the new memory to remove any references to the old memory - new_memory.clear() - return new_memory + def deep_copy(self): + # Create a shallow copy of the object + copied_obj = self.__class__(**self.model_dump()) + + # Manually deepcopy fields that are known to be safe + for field_name, field_value in self.__dict__.items(): + if field_name not in ["db", "classifier", "manager", "summarizer"]: + try: + setattr(copied_obj, field_name, deepcopy(field_value)) + except Exception as e: + logger.warning(f"Failed to deepcopy field: {field_name} - {e}") + setattr(copied_obj, field_name, field_value) + + copied_obj.db = self.db + copied_obj.classifier = self.classifier + copied_obj.manager = self.manager + copied_obj.summarizer = self.summarizer + + return copied_obj diff --git a/phi/memory/summarizer.py b/phi/memory/summarizer.py index 2b771374e..6b4799add 100644 --- a/phi/memory/summarizer.py +++ b/phi/memory/summarizer.py @@ -44,13 +44,15 @@ def get_system_message(self, messages_for_summarization: List[Dict[str, str]]) - Conversation: """) - - system_prompt += "\n".join( - [ - f"User: {message_pair['user']}\nAssistant: {message_pair['assistant']}" - for message_pair in messages_for_summarization - ] - ) + conversation = [] + for message_pair in messages_for_summarization: + conversation.append(f"User: {message_pair['user']}") + if "assistant" in message_pair: + conversation.append(f"Assistant: {message_pair['assistant']}") + elif "model" in message_pair: + conversation.append(f"Assistant: {message_pair['model']}") + + system_prompt += "\n".join(conversation) if not self.use_structured_outputs: system_prompt += "\n\nProvide your output as a JSON containing the following fields:" diff --git a/phi/model/content.py b/phi/model/content.py index 529e90fef..228c0dda9 100644 --- a/phi/model/content.py +++ b/phi/model/content.py @@ -1,19 +1,37 @@ -from typing import Optional +from typing import Optional, Any -from pydantic import BaseModel +from pydantic import BaseModel, model_validator -class Video(BaseModel): +class Media(BaseModel): id: str - url: str original_prompt: Optional[str] = None revised_prompt: Optional[str] = None + + +class Video(Media): + url: str # Remote location for file eta: Optional[str] = None + length: Optional[str] = None -class Image(BaseModel): - id: str - url: str +class Image(Media): + url: str # Remote location for file alt_text: Optional[str] = None - original_prompt: Optional[str] = None - revised_prompt: Optional[str] = None + + +class Audio(Media): + url: Optional[str] = None # Remote location for file + base64_audio: Optional[str] = None # Base64-encoded audio data + length: Optional[str] = None + + @model_validator(mode="before") + def validate_exclusive_audio(cls, data: Any): + """ + Ensure that either `url` or `base64_audio` is provided, but not both. + """ + if data.get("url") and data.get("base64_audio"): + raise ValueError("Provide either `url` or `base64_audio`, not both.") + if not data.get("url") and not data.get("base64_audio"): + raise ValueError("Either `url` or `base64_audio` must be provided.") + return data diff --git a/phi/model/google/gemini.py b/phi/model/google/gemini.py index b2de87c2e..878fcf80c 100644 --- a/phi/model/google/gemini.py +++ b/phi/model/google/gemini.py @@ -155,7 +155,7 @@ def format_messages(self, messages: List[Message]) -> List[Dict[str, Any]]: content = message.content # Initialize message_parts to be used for Gemini message_parts: List[Any] = [] - if not content or message.role in ["tool", "model"]: + if (not content or message.role in ["tool", "model"]) and hasattr(message, "parts"): message_parts = message.parts # type: ignore else: if isinstance(content, str): diff --git a/phi/model/message.py b/phi/model/message.py index 5345239e5..d4808738b 100644 --- a/phi/model/message.py +++ b/phi/model/message.py @@ -73,11 +73,12 @@ def get_content_string(self) -> str: def to_dict(self) -> Dict[str, Any]: _dict = self.model_dump( exclude_none=True, - include={"role", "content", "name", "tool_call_id", "tool_calls"}, + include={"role", "content", "audio", "name", "tool_call_id", "tool_calls"}, ) # Manually add the content field even if it is None if self.content is None: _dict["content"] = None + return _dict def log(self, level: Optional[str] = None): diff --git a/phi/model/ollama/chat.py b/phi/model/ollama/chat.py index 4c5c809da..acc744454 100644 --- a/phi/model/ollama/chat.py +++ b/phi/model/ollama/chat.py @@ -134,6 +134,11 @@ def request_kwargs(self) -> Dict[str, Any]: request_params["keep_alive"] = self.keep_alive if self.tools is not None: request_params["tools"] = self.get_tools_for_api() + # Ensure types are valid strings + for tool in request_params["tools"]: + for prop, obj in tool["function"]["parameters"]["properties"].items(): + if isinstance(obj["type"], list): + obj["type"] = obj["type"][0] if self.request_params is not None: request_params.update(self.request_params) return request_params diff --git a/phi/model/openai/chat.py b/phi/model/openai/chat.py index 3d8715d89..7b6256cb9 100644 --- a/phi/model/openai/chat.py +++ b/phi/model/openai/chat.py @@ -3,6 +3,7 @@ from typing import Optional, List, Iterator, Dict, Any, Union import httpx +from packaging import version from pydantic import BaseModel from phi.model.base import Model @@ -24,6 +25,17 @@ ChoiceDeltaToolCall, ) from openai.types.chat.chat_completion_message import ChatCompletionMessage + + MIN_OPENAI_VERSION = "1.52.0" + + # Check the installed openai version + from openai import __version__ as installed_version + + if version.parse(installed_version) < version.parse(MIN_OPENAI_VERSION): + logger.warning( + f"`openai` version must be >= {MIN_OPENAI_VERSION}, but found {installed_version}. " + f"Please upgrade using `pip install --upgrade openai`." + ) except (ModuleNotFoundError, ImportError): raise ImportError("`openai` not installed. Please install using `pip install openai`") @@ -59,6 +71,7 @@ def log(self): @dataclass class StreamData: response_content: str = "" + response_audio: Optional[dict] = None response_tool_calls: Optional[List[ChoiceDeltaToolCall]] = None @@ -298,8 +311,9 @@ def format_message(self, message: Message) -> Dict[str, Any]: if message.role == "user": if message.images is not None: message = self.add_images_to_message(message=message, images=message.images) - if message.audio is not None: - message = self.add_audio_to_message(message=message, audio=message.audio) + + if message.audio is not None: + message = self.add_audio_to_message(message=message, audio=message.audio) return message.to_dict() @@ -547,7 +561,7 @@ def create_assistant_message( assistant_message.tool_calls = [t.model_dump() for t in response_message.tool_calls] except Exception as e: logger.warning(f"Error processing tool calls: {e}") - if response_message.audio is not None: + if hasattr(response_message, "audio") and response_message.audio is not None: try: assistant_message.audio = response_message.audio.model_dump() except Exception as e: @@ -842,17 +856,20 @@ def response_stream(self, messages: List[Message]) -> Iterator[ModelResponse]: metrics.time_to_first_token = metrics.response_timer.elapsed response_delta: ChoiceDelta = response.choices[0].delta - response_content: Optional[str] = response_delta.content - response_tool_calls: Optional[List[ChoiceDeltaToolCall]] = response_delta.tool_calls - if response_content is not None: - stream_data.response_content += response_content - yield ModelResponse(content=response_content) + if response_delta.content is not None: + stream_data.response_content += response_delta.content + yield ModelResponse(content=response_delta.content) - if response_tool_calls is not None: + if hasattr(response_delta, "audio"): + response_audio = response_delta.audio + stream_data.response_audio = response_audio + yield ModelResponse(audio=response_audio) + + if response_delta.tool_calls is not None: if stream_data.response_tool_calls is None: stream_data.response_tool_calls = [] - stream_data.response_tool_calls.extend(response_tool_calls) + stream_data.response_tool_calls.extend(response_delta.tool_calls) if response.usage is not None: self.add_response_usage_to_metrics(metrics=metrics, response_usage=response.usage) @@ -863,6 +880,9 @@ def response_stream(self, messages: List[Message]) -> Iterator[ModelResponse]: if stream_data.response_content != "": assistant_message.content = stream_data.response_content + if stream_data.response_audio is not None: + assistant_message.audio = stream_data.response_audio + if stream_data.response_tool_calls is not None: _tool_calls = self.build_tool_calls(stream_data.response_tool_calls) if len(_tool_calls) > 0: @@ -905,23 +925,26 @@ async def aresponse_stream(self, messages: List[Message]) -> Any: # -*- Generate response metrics.response_timer.start() async for response in self.ainvoke_stream(messages=messages): - if len(response.choices) > 0: + if response.choices and len(response.choices) > 0: metrics.completion_tokens += 1 if metrics.completion_tokens == 1: metrics.time_to_first_token = metrics.response_timer.elapsed response_delta: ChoiceDelta = response.choices[0].delta - response_content = response_delta.content - response_tool_calls = response_delta.tool_calls - if response_content is not None: - stream_data.response_content += response_content - yield ModelResponse(content=response_content) + if response_delta.content is not None: + stream_data.response_content += response_delta.content + yield ModelResponse(content=response_delta.content) - if response_tool_calls is not None: + if hasattr(response_delta, "audio"): + response_audio = response_delta.audio + stream_data.response_audio = response_audio + yield ModelResponse(audio=response_audio) + + if response_delta.tool_calls is not None: if stream_data.response_tool_calls is None: stream_data.response_tool_calls = [] - stream_data.response_tool_calls.extend(response_tool_calls) + stream_data.response_tool_calls.extend(response_delta.tool_calls) if response.usage is not None: self.add_response_usage_to_metrics(metrics=metrics, response_usage=response.usage) @@ -932,6 +955,9 @@ async def aresponse_stream(self, messages: List[Message]) -> Any: if stream_data.response_content != "": assistant_message.content = stream_data.response_content + if stream_data.response_audio is not None: + assistant_message.audio = stream_data.response_audio + if stream_data.response_tool_calls is not None: _tool_calls = self.build_tool_calls(stream_data.response_tool_calls) if len(_tool_calls) > 0: diff --git a/phi/playground/router.py b/phi/playground/router.py index a5e1663dd..5023755bb 100644 --- a/phi/playground/router.py +++ b/phi/playground/router.py @@ -92,7 +92,7 @@ def chat_response_streamer( run_response = agent.run(message, images=images, stream=True, stream_intermediate_steps=True) for run_response_chunk in run_response: run_response_chunk = cast(RunResponse, run_response_chunk) - yield run_response_chunk.model_dump_json() + yield run_response_chunk.to_json() def process_image(file: UploadFile) -> List[Union[str, Dict]]: content = file.file.read() @@ -399,7 +399,7 @@ async def chat_response_streamer( run_response = await agent.arun(message, images=images, stream=True, stream_intermediate_steps=True) async for run_response_chunk in run_response: run_response_chunk = cast(RunResponse, run_response_chunk) - yield run_response_chunk.model_dump_json() + yield run_response_chunk.to_json() async def process_image(file: UploadFile) -> List[Union[str, Dict]]: content = file.file.read() @@ -553,8 +553,13 @@ async def run_workflow(workflow_id: str, body: WorkflowRunRequest): if workflow is None: raise HTTPException(status_code=404, detail="Workflow not found") + if body.session_id is not None: + logger.debug(f"Continuing session: {body.session_id}") + else: + logger.debug("Creating new session") + # Create a new instance of this workflow - new_workflow_instance = workflow.deep_copy(update={"workflow_id": workflow_id}) + new_workflow_instance = workflow.deep_copy(update={"workflow_id": workflow_id, "session_id": body.session_id}) new_workflow_instance.user_id = body.user_id # Return based on the response type diff --git a/phi/playground/schemas.py b/phi/playground/schemas.py index 7eb470ea4..660e210a1 100644 --- a/phi/playground/schemas.py +++ b/phi/playground/schemas.py @@ -68,3 +68,4 @@ class WorkflowRenameRequest(BaseModel): class WorkflowRunRequest(BaseModel): input: Dict[str, Any] user_id: Optional[str] = None + session_id: Optional[str] = None diff --git a/phi/run/response.py b/phi/run/response.py index a711d66dc..13e0fee5f 100644 --- a/phi/run/response.py +++ b/phi/run/response.py @@ -1,10 +1,11 @@ +import json from time import time from enum import Enum from typing import Optional, Any, Dict, List from pydantic import BaseModel, ConfigDict, Field -from phi.model.content import Video, Image +from phi.model.content import Video, Image, Audio from phi.reasoning.step import ReasoningStep from phi.model.message import Message, MessageReferences @@ -49,14 +50,30 @@ class RunResponse(BaseModel): session_id: Optional[str] = None workflow_id: Optional[str] = None tools: Optional[List[Dict[str, Any]]] = None - images: Optional[List[Image]] = None - videos: Optional[List[Video]] = None - audio: Optional[Dict] = None + images: Optional[List[Image]] = None # Images attached to the response + videos: Optional[List[Video]] = None # Videos attached to the response + audio: Optional[List[Audio]] = None # Audio attached to the response + response_audio: Optional[Dict] = None # Model audio response extra_data: Optional[RunResponseExtraData] = None created_at: int = Field(default_factory=lambda: int(time())) model_config = ConfigDict(arbitrary_types_allowed=True) + def to_json(self) -> str: + _dict = self.model_dump( + exclude_none=True, + exclude={"messages"}, + ) + if self.messages is not None: + _dict["messages"] = [ + m.model_dump( + exclude_none=True, + exclude={"parts"}, # Exclude what Gemini adds + ) + for m in self.messages + ] + return json.dumps(_dict, indent=2) + def to_dict(self) -> Dict[str, Any]: _dict = self.model_dump( exclude_none=True, diff --git a/phi/storage/agent/json.py b/phi/storage/agent/json.py new file mode 100644 index 000000000..b39a296ef --- /dev/null +++ b/phi/storage/agent/json.py @@ -0,0 +1,89 @@ +import json +import time +from pathlib import Path +from typing import Union, Optional, List + +from phi.storage.agent.base import AgentStorage +from phi.agent import AgentSession +from phi.utils.log import logger + + +class JsonFileAgentStorage(AgentStorage): + def __init__(self, dir_path: Union[str, Path]): + self.dir_path = Path(dir_path) + self.dir_path.mkdir(parents=True, exist_ok=True) + + def serialize(self, data: dict) -> str: + return json.dumps(data, ensure_ascii=False, indent=4) + + def deserialize(self, data: str) -> dict: + return json.loads(data) + + def create(self) -> None: + """Create the storage if it doesn't exist.""" + if not self.dir_path.exists(): + self.dir_path.mkdir(parents=True, exist_ok=True) + + def read(self, session_id: str, user_id: Optional[str] = None) -> Optional[AgentSession]: + """Read an AgentSession from storage.""" + try: + with open(self.dir_path / f"{session_id}.json", "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if user_id and data["user_id"] != user_id: + return None + return AgentSession.model_validate(data) + except FileNotFoundError: + return None + + def get_all_session_ids(self, user_id: Optional[str] = None, agent_id: Optional[str] = None) -> List[str]: + """Get all session IDs, optionally filtered by user_id and/or agent_id.""" + session_ids = [] + for file in self.dir_path.glob("*.json"): + with open(file, "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if (not user_id or data["user_id"] == user_id) and (not agent_id or data["agent_id"] == agent_id): + session_ids.append(data["session_id"]) + return session_ids + + def get_all_sessions(self, user_id: Optional[str] = None, agent_id: Optional[str] = None) -> List[AgentSession]: + """Get all sessions, optionally filtered by user_id and/or agent_id.""" + sessions = [] + for file in self.dir_path.glob("*.json"): + with open(file, "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if (not user_id or data["user_id"] == user_id) and (not agent_id or data["agent_id"] == agent_id): + sessions.append(AgentSession.model_validate(data)) + return sessions + + def upsert(self, session: AgentSession) -> Optional[AgentSession]: + """Insert or update an AgentSession in storage.""" + try: + data = session.model_dump() + data["updated_at"] = int(time.time()) + if "created_at" not in data: + data["created_at"] = data["updated_at"] + + with open(self.dir_path / f"{session.session_id}.json", "w", encoding="utf-8") as f: + f.write(self.serialize(data)) + return session + except Exception as e: + logger.error(f"Error upserting session: {e}") + return None + + def delete_session(self, session_id: Optional[str] = None): + """Delete a session from storage.""" + if session_id is None: + return + try: + (self.dir_path / f"{session_id}.json").unlink(missing_ok=True) + except Exception as e: + logger.error(f"Error deleting session: {e}") + + def drop(self) -> None: + """Drop all sessions from storage.""" + for file in self.dir_path.glob("*.json"): + file.unlink() + + def upgrade_schema(self) -> None: + """Upgrade the schema of the storage.""" + pass diff --git a/phi/storage/agent/yaml.py b/phi/storage/agent/yaml.py new file mode 100644 index 000000000..8c855e311 --- /dev/null +++ b/phi/storage/agent/yaml.py @@ -0,0 +1,89 @@ +import yaml +import time +from pathlib import Path +from typing import Union, Optional, List + +from phi.storage.agent.base import AgentStorage +from phi.agent import AgentSession +from phi.utils.log import logger + + +class YamlFileAgentStorage(AgentStorage): + def __init__(self, dir_path: Union[str, Path]): + self.dir_path = Path(dir_path) + self.dir_path.mkdir(parents=True, exist_ok=True) + + def serialize(self, data: dict) -> str: + return yaml.dump(data, default_flow_style=False) + + def deserialize(self, data: str) -> dict: + return yaml.safe_load(data) + + def create(self) -> None: + """Create the storage if it doesn't exist.""" + if not self.dir_path.exists(): + self.dir_path.mkdir(parents=True, exist_ok=True) + + def read(self, session_id: str, user_id: Optional[str] = None) -> Optional[AgentSession]: + """Read an AgentSession from storage.""" + try: + with open(self.dir_path / f"{session_id}.yaml", "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if user_id and data["user_id"] != user_id: + return None + return AgentSession.model_validate(data) + except FileNotFoundError: + return None + + def get_all_session_ids(self, user_id: Optional[str] = None, agent_id: Optional[str] = None) -> List[str]: + """Get all session IDs, optionally filtered by user_id and/or agent_id.""" + session_ids = [] + for file in self.dir_path.glob("*.yaml"): + with open(file, "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if (not user_id or data["user_id"] == user_id) and (not agent_id or data["agent_id"] == agent_id): + session_ids.append(data["session_id"]) + return session_ids + + def get_all_sessions(self, user_id: Optional[str] = None, agent_id: Optional[str] = None) -> List[AgentSession]: + """Get all sessions, optionally filtered by user_id and/or agent_id.""" + sessions = [] + for file in self.dir_path.glob("*.yaml"): + with open(file, "r", encoding="utf-8") as f: + data = self.deserialize(f.read()) + if (not user_id or data["user_id"] == user_id) and (not agent_id or data["agent_id"] == agent_id): + sessions.append(AgentSession.model_validate(data)) + return sessions + + def upsert(self, session: AgentSession) -> Optional[AgentSession]: + """Insert or update an AgentSession in storage.""" + try: + data = session.model_dump() + data["updated_at"] = int(time.time()) + if "created_at" not in data: + data["created_at"] = data["updated_at"] + + with open(self.dir_path / f"{session.session_id}.yaml", "w", encoding="utf-8") as f: + f.write(self.serialize(data)) + return session + except Exception as e: + logger.error(f"Error upserting session: {e}") + return None + + def delete_session(self, session_id: Optional[str] = None): + """Delete a session from storage.""" + if session_id is None: + return + try: + (self.dir_path / f"{session_id}.yaml").unlink(missing_ok=True) + except Exception as e: + logger.error(f"Error deleting session: {e}") + + def drop(self) -> None: + """Drop all sessions from storage.""" + for file in self.dir_path.glob("*.yaml"): + file.unlink() + + def upgrade_schema(self) -> None: + """Upgrade the schema of the storage.""" + pass diff --git a/phi/tools/confluence.py b/phi/tools/confluence.py new file mode 100644 index 000000000..e318a34cc --- /dev/null +++ b/phi/tools/confluence.py @@ -0,0 +1,174 @@ +from phi.tools import Toolkit +from phi.utils.log import logger +from typing import Optional +from os import getenv +import json + +try: + from atlassian import Confluence +except (ModuleNotFoundError, ImportError): + raise ImportError("atlassian-python-api not install . Please install using `pip install atlassian-python-api`") + + +class ConfluenceTools(Toolkit): + def __init__( + self, + username: Optional[str] = None, + password: Optional[str] = None, + url: Optional[str] = None, + api_key: Optional[str] = None, + ): + """Initialize Confluence Tools with authentication credentials. + + Args: + username (str, optional): Confluence username. Defaults to None. + password (str, optional): Confluence password. Defaults to None. + url (str, optional): Confluence instance URL. Defaults to None. + api_key (str, optional): Confluence API key. Defaults to None. + + Notes: + Credentials can be provided either through method arguments or environment variables: + - CONFLUENCE_URL + - CONFLUENCE_USERNAME + - CONFLUENCE_API_KEY + """ + + super().__init__(name="confluence_tools") + self.url = url or getenv("CONFLUENCE_URL") + self.username = username or getenv("CONFLUENCE_USERNAME") + self.password = api_key or getenv("CONFLUENCE_API_KEY") or password or getenv("CONFLUENCE_PASSWORD") + + if not self.url: + logger.error( + "Confluence URL not provided. Pass it in the constructor or set CONFLUENCE_URL in environment variable" + ) + + if not self.username: + logger.error( + "Confluence username not provided. Pass it in the constructor or set CONFLUENCE_USERNAME in environment variable" + ) + + if not self.password: + logger.error("Confluence API KEY or password not provided") + + self.confluence = Confluence(url=self.url, username=self.username, password=self.password) + + self.register(self.get_page_content) + self.register(self.get_space_key) + self.register(self.create_page) + self.register(self.update_page) + self.register(self.get_all_space_detail) + self.register(self.get_all_page_from_space) + + def get_page_content(self, space_name: str, page_title: str, expand: Optional[str] = "body.storage"): + """Retrieve the content of a specific page in a Confluence space. + + Args: + space_name (str): Name of the Confluence space. + page_title (str): Title of the page to retrieve. + expand (str, optional): Fields to expand in the page response. Defaults to "body.storage". + + Returns: + str: JSON-encoded page content or error message. + """ + try: + logger.info(f"Retrieving page content from space '{space_name}'") + key = self.get_space_key(space_name=space_name) + page = self.confluence.get_page_by_title(key, page_title, expand=expand) + if page: + logger.info(f"Successfully retrieved page '{page_title}' from space '{space_name}'") + return json.dumps(page) + + logger.warning(f"Page '{page_title}' not found in space '{space_name}'") + return json.dumps({"error": f"Page '{page_title}' not found in space '{space_name}'"}) + + except Exception as e: + logger.error(f"Error retrieving page '{page_title}': {e}") + return json.dumps({"error": str(e)}) + + def get_all_space_detail(self): + """Retrieve details about all Confluence spaces. + + Returns: + str: List of space details as a string. + """ + logger.info("Retrieving details for all Confluence spaces") + results = self.confluence.get_all_spaces()["results"] + return str(results) + + def get_space_key(self, space_name: str): + """Get the space key for a particular Confluence space. + + Args: + space_name (str): Name of the space whose key is required. + + Returns: + str: Space key or "No space found" if space doesn't exist. + """ + result = self.confluence.get_all_spaces() + spaces = result["results"] + + for space in spaces: + if space["name"] == space_name: + logger.info(f"Found space key for '{space_name}'") + return space["key"] + + logger.warning(f"No space named {space_name} found") + return "No space found" + + def get_all_page_from_space(self, space_name: str): + """Retrieve all pages from a specific Confluence space. + + Args: + space_name (str): Name of the Confluence space. + + Returns: + list: Details of pages in the specified space. + """ + logger.info(f"Retrieving all pages from space '{space_name}'") + space_key = self.get_space_key(space_name) + page_details = self.confluence.get_all_pages_from_space( + space_key, status=None, expand=None, content_type="page" + ) + page_details = str([{"id": page["id"], "title": page["title"]} for page in page_details]) + return page_details + + def create_page(self, space_name: str, title: str, body: str, parent_id: Optional[str] = None) -> str: + """Create a new page in Confluence. + + Args: + space_name (str): Name of the Confluence space. + title (str): Title of the new page. + body (str): Content of the new page. + parent_id (str, optional): ID of the parent page if creating a child page. Defaults to None. + + Returns: + str: JSON-encoded page ID and title or error message. + """ + try: + space_key = self.get_space_key(space_name=space_name) + page = self.confluence.create_page(space_key, title, body, parent_id=parent_id) + logger.info(f"Page created: {title} with ID {page['id']}") + return json.dumps({"id": page["id"], "title": title}) + except Exception as e: + logger.error(f"Error creating page '{title}': {e}") + return json.dumps({"error": str(e)}) + + def update_page(self, page_id: str, title: str, body: str) -> str: + """Update an existing Confluence page. + + Args: + page_id (str): ID of the page to update. + title (str): New title for the page. + body (str): Updated content for the page. + + Returns: + str: JSON-encoded status and ID of the updated page or error message. + """ + try: + updated_page = self.confluence.update_page(page_id, title, body) + logger.info(f"Page updated: {title} with ID {updated_page['id']}") + return json.dumps({"status": "success", "id": updated_page["id"]}) + except Exception as e: + logger.error(f"Error updating page '{title}': {e}") + return json.dumps({"error": str(e)}) diff --git a/phi/tools/eleven_labs_tools.py b/phi/tools/eleven_labs_tools.py new file mode 100644 index 000000000..8b307733f --- /dev/null +++ b/phi/tools/eleven_labs_tools.py @@ -0,0 +1,186 @@ +from base64 import b64encode +from io import BytesIO +from pathlib import Path +from typing import Iterator + +from phi.model.content import Audio +from typing import Optional, Literal +from os import getenv, path +from phi.tools import Toolkit +from phi.utils.log import logger +from phi.agent import Agent +from uuid import uuid4 + +try: + from elevenlabs import ElevenLabs # type: ignore +except ImportError: + raise ImportError("`elevenlabs` not installed. Please install using `pip install elevenlabs`") + +ElevenLabsAudioOutputFormat = Literal[ + "mp3_22050_32", # mp3 with 22.05kHz sample rate at 32kbps + "mp3_44100_32", # mp3 with 44.1kHz sample rate at 32kbps + "mp3_44100_64", # mp3 with 44.1kHz sample rate at 64kbps + "mp3_44100_96", # mp3 with 44.1kHz sample rate at 96kbps + "mp3_44100_128", # default, mp3 with 44.1kHz sample rate at 128kbps + "mp3_44100_192", # mp3 with 44.1kHz sample rate at 192kbps (Creator tier+) + "pcm_16000", # PCM format (S16LE) with 16kHz sample rate + "pcm_22050", # PCM format (S16LE) with 22.05kHz sample rate + "pcm_24000", # PCM format (S16LE) with 24kHz sample rate + "pcm_44100", # PCM format (S16LE) with 44.1kHz sample rate (Pro tier+) + "ulaw_8000", # μ-law format with 8kHz sample rate (for Twilio) +] + + +class ElevenLabsTools(Toolkit): + def __init__( + self, + voice_id: str = "JBFqnCBsd6RMkjVDRZzb", + api_key: Optional[str] = None, + target_directory: Optional[str] = None, + model_id: str = "eleven_multilingual_v2", + output_format: ElevenLabsAudioOutputFormat = "mp3_44100_64", + ): + super().__init__(name="elevenlabs_tools") + + self.api_key = api_key or getenv("ELEVEN_LABS_API_KEY") + if not self.api_key: + logger.error("ELEVEN_LABS_API_KEY not set. Please set the ELEVEN_LABS_API_KEY environment variable.") + + self.target_directory = target_directory + self.voice_id = voice_id + self.model_id = model_id + self.output_format = output_format + + if self.target_directory: + target_path = Path(self.target_directory) + target_path.mkdir(parents=True, exist_ok=True) + + self.eleven_labs_client = ElevenLabs(api_key=self.api_key) + self.register(self.get_voices) + self.register(self.generate_sound_effect) + self.register(self.text_to_speech) + + def get_voices(self) -> str: + """ + Use this function to get all the voices available. + + Returns: + result (list): A list of voices that have an ID, name and description. + """ + try: + voices = self.eleven_labs_client.voices.get_all() + + response = [] + for voice in voices.voices: + response.append( + { + "id": voice.voice_id, + "name": voice.name, + "description": voice.description, + } + ) + + return str(response) + + except Exception as e: + logger.error(f"Failed to fetch voices: {e}") + return f"Error: {e}" + + def _process_audio(self, audio_generator: Iterator[bytes]) -> str: + # Step 1: Write audio data to BytesIO + audio_bytes = BytesIO() + for chunk in audio_generator: + audio_bytes.write(chunk) + audio_bytes.seek(0) # Rewind the stream + + # Step 2: Encode as Base64 + base64_audio = b64encode(audio_bytes.read()).decode("utf-8") + + # Step 3: Optionally save to disk if target_directory exists + if self.target_directory: + # Determine file extension based on output format + if self.output_format.startswith("mp3"): + extension = "mp3" + elif self.output_format.startswith("pcm"): + extension = "wav" + elif self.output_format.startswith("ulaw"): + extension = "ulaw" + else: + extension = "mp3" + + output_filename = f"{uuid4()}.{extension}" + output_path = path.join(self.target_directory, output_filename) + + # Write from BytesIO to disk + audio_bytes.seek(0) # Reset the BytesIO stream again + with open(output_path, "wb") as f: + f.write(audio_bytes.read()) + + return base64_audio + + def generate_sound_effect(self, agent: Agent, prompt: str, duration_seconds: Optional[float] = None) -> str: + """ + Use this function to generate sound effect audio from a text prompt. + + Args: + prompt (str): Text to generate audio from. + duration_seconds (Optional[float]): Duration in seconds to generate audio from. + Returns: + str: Return the path to the generated audio file. + """ + try: + audio_generator = self.eleven_labs_client.text_to_sound_effects.convert( + text=prompt, duration_seconds=duration_seconds + ) + + base64_audio = self._process_audio(audio_generator) + + # Attach to the agent + agent.add_audio( + Audio( + id=str(uuid4()), + base64_audio=base64_audio, + mime_type="audio/mpeg", + ) + ) + + return "Audio generated successfully" + + except Exception as e: + logger.error(f"Failed to generate audio: {e}") + return f"Error: {e}" + + def text_to_speech(self, agent: Agent, prompt: str, voice_id: Optional[str] = None) -> str: + """ + Use this function to convert text to speech audio. + + Args: + prompt (str): Text to generate audio from. + voice_id (Optional[str]): The ID of the voice to use for audio generation. Uses default if none is specified. + Returns: + str: Return the path to the generated audio file. + """ + try: + audio_generator = self.eleven_labs_client.text_to_speech.convert( + text=prompt, + voice_id=voice_id or self.voice_id, + model_id=self.model_id, + output_format=self.output_format, + ) + + base64_audio = self._process_audio(audio_generator) + + # Attach to the agent + agent.add_audio( + Audio( + id=str(uuid4()), + base64_audio=base64_audio, + mime_type="audio/mpeg", + ) + ) + + return "Audio generated successfully" + + except Exception as e: + logger.error(f"Failed to generate audio: {e}") + return f"Error: {e}" diff --git a/phi/tools/fal_tools.py b/phi/tools/fal_tools.py index e51eb0926..8be8d9e7f 100644 --- a/phi/tools/fal_tools.py +++ b/phi/tools/fal_tools.py @@ -27,9 +27,9 @@ def __init__( super().__init__(name="fal") self.api_key = api_key or getenv("FAL_KEY") - self.model = model if not self.api_key: logger.error("FAL_KEY not set. Please set the FAL_KEY environment variable.") + self.model = model self.seen_logs: set[str] = set() self.register(self.generate_media) diff --git a/phi/tools/function.py b/phi/tools/function.py index 24d103165..a8174e147 100644 --- a/phi/tools/function.py +++ b/phi/tools/function.py @@ -158,7 +158,6 @@ def process_entrypoint(self, strict: bool = False): # Get JSON schema for parameters only parameters = get_json_schema(type_hints=param_type_hints, strict=strict) - # If strict=True mark all fields as required # See: https://platform.openai.com/docs/guides/structured-outputs/supported-schemas#all-fields-must-be-required if strict: @@ -175,7 +174,7 @@ def process_entrypoint(self, strict: bool = False): except Exception as e: logger.warning(f"Could not parse args for {self.name}: {e}", exc_info=True) - self.description = getdoc(self.entrypoint) + self.description = getdoc(self.entrypoint) or self.description self.parameters = parameters self.entrypoint = validate_call(self.entrypoint) diff --git a/phi/utils/audio.py b/phi/utils/audio.py new file mode 100644 index 000000000..af30dd146 --- /dev/null +++ b/phi/utils/audio.py @@ -0,0 +1,13 @@ +import base64 + + +def write_audio_to_file(audio, filename: str): + """ + Write base64 encoded audio file to disk. + + :param audio: Base64 encoded audio file + :param filename: The filename to save the audio to + """ + wav_bytes = base64.b64decode(audio) + with open(filename, "wb") as f: + f.write(wav_bytes) diff --git a/phi/workflow/workflow.py b/phi/workflow/workflow.py index 668930813..dccab8dac 100644 --- a/phi/workflow/workflow.py +++ b/phi/workflow/workflow.py @@ -321,7 +321,9 @@ def __init__(self, **data): self._run_parameters = { name: { "name": name, - "default": param.default if param.default is not inspect.Parameter.empty else None, + "default": param.default.default + if hasattr(param.default, "__class__") and param.default.__class__.__name__ == "FieldInfo" + else (param.default if param.default is not inspect.Parameter.empty else None), "annotation": ( param.annotation.__name__ if hasattr(param.annotation, "__name__") diff --git a/pyproject.toml b/pyproject.toml index e6000c9e8..1ca8e856e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "phidata" -version = "2.7.3" +version = "2.7.4" description = "Build multi-modal Agents with memory, knowledge and tools." requires-python = ">=3.7" readme = "README.md" @@ -98,6 +98,7 @@ module = [ "anthropic.*", "apify_client.*", "arxiv.*", + "atlassian.*", "boto3.*", "botocore.*", "bs4.*",