diff --git a/phi/llm/agent/arxiv.py b/phi/llm/agent/arxiv.py deleted file mode 100644 index e7b2660bb..000000000 --- a/phi/llm/agent/arxiv.py +++ /dev/null @@ -1,53 +0,0 @@ -import json -from typing import List, Optional - -from phi.document import Document -from phi.knowledge.arxiv import ArxivKnowledgeBase -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class ArxivAgent(BaseAgent): - def __init__(self, knowledge_base: Optional[ArxivKnowledgeBase] = None): - super().__init__(name="arxiv_agent") - self.knowledge_base: Optional[ArxivKnowledgeBase] = knowledge_base - - if self.knowledge_base is not None and isinstance(self.knowledge_base, ArxivKnowledgeBase): - self.register(self.search_arxiv_and_update_knowledge_base) - else: - self.register(self.search_arxiv) - - def search_arxiv_and_update_knowledge_base(self, topic: str) -> str: - """This function searches arXiv for a topic, adds the results to the knowledge base and returns them. - - USE THIS FUNCTION TO GET INFORMATION WHICH DOES NOT EXIST. - - :param topic: The topic to search arXiv and add to knowledge base. - :return: Relevant documents from arXiv knowledge base. - """ - if self.knowledge_base is None: - return "Knowledge base not provided" - - logger.debug(f"Adding to knowledge base: {topic}") - self.knowledge_base.queries.append(topic) - logger.debug("Loading knowledge base.") - self.knowledge_base.load(recreate=False) - logger.debug(f"Searching knowledge base: {topic}") - relevant_docs: List[Document] = self.knowledge_base.search(query=topic) - return json.dumps([doc.to_dict() for doc in relevant_docs]) - - def search_arxiv(self, query: str, max_results: int = 5) -> str: - """ - Searches arXiv for a query. - - :param query: The query to search for. - :param max_results: The maximum number of results to return. - :return: Relevant documents from arXiv. - """ - from phi.document.reader.arxiv import ArxivReader - - arxiv = ArxivReader(max_results=max_results) - - logger.debug(f"Searching arxiv for: {query}") - relevant_docs: List[Document] = arxiv.read(query=query) - return json.dumps([doc.to_dict() for doc in relevant_docs]) diff --git a/phi/llm/agent/base.py b/phi/llm/agent/base.py deleted file mode 100644 index 427ae4bb7..000000000 --- a/phi/llm/agent/base.py +++ /dev/null @@ -1,27 +0,0 @@ -from collections import OrderedDict -from typing import Callable, Dict - -from phi.tool.function import Function -from phi.utils.log import logger - - -class BaseAgent: - def __init__(self, name: str = "base_agent"): - self.name: str = name - self.functions: Dict[str, Function] = OrderedDict() - - def register(self, function: Callable): - try: - f = Function.from_callable(function) - self.functions[f.name] = f - logger.debug(f"Function: {f.name} registered with {self.name}") - logger.debug(f"Json Schema: {f.to_dict()}") - except Exception as e: - logger.warning(f"Failed to create Function for: {function.__name__}") - raise e - - def __repr__(self): - return f"<{self.__class__.__name__} name={self.name} functions={list(self.functions.keys())}>" - - def __str__(self): - return self.__repr__() diff --git a/phi/llm/agent/duckdb.py b/phi/llm/agent/duckdb.py deleted file mode 100644 index 9f0abe157..000000000 --- a/phi/llm/agent/duckdb.py +++ /dev/null @@ -1,339 +0,0 @@ -from typing import Optional, Tuple - -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - -try: - import duckdb -except ImportError: - raise ImportError("`duckdb` not installed. Please install it using `pip install duckdb`.") - - -class DuckDbAgent(BaseAgent): - def __init__( - self, - db_path: str = ":memory:", - s3_region: str = "us-east-1", - duckdb_connection: Optional[duckdb.DuckDBPyConnection] = None, - ): - super().__init__(name="duckdb_registry") - - self.db_path: str = db_path - self.s3_region: str = s3_region - self._duckdb_connection: Optional[duckdb.DuckDBPyConnection] = duckdb_connection - - self.register(self.run_duckdb_query) - self.register(self.show_tables) - self.register(self.describe_table) - self.register(self.inspect_query) - self.register(self.describe_table_or_view) - self.register(self.export_table_as) - self.register(self.summarize_table) - self.register(self.create_fts_index) - self.register(self.full_text_search) - - @property - def duckdb_connection(self) -> duckdb.DuckDBPyConnection: - """ - Returns the duckdb connection - - :return duckdb.DuckDBPyConnection: duckdb connection - """ - if self._duckdb_connection is None: - self._duckdb_connection = duckdb.connect(self.db_path) - try: - self._duckdb_connection.sql("INSTALL httpfs;") - self._duckdb_connection.sql("LOAD httpfs;") - self._duckdb_connection.sql(f"SET s3_region='{self.s3_region}';") - except Exception as e: - logger.exception(e) - logger.warning("Failed to install httpfs extension. Only local files will be supported") - - return self._duckdb_connection - - def run_duckdb_query(self, query: str) -> str: - """Function to run SQL queries against a duckdb database - - :param query: SQL query to run - :return: Result of the query - """ - - # -*- Format the SQL Query - # Remove backticks - formatted_sql = query.replace("`", "") - # If there are multiple statements, only run the first one - formatted_sql = formatted_sql.split(";")[0] - - try: - logger.debug(f"Running query: {formatted_sql}") - - query_result = self.duckdb_connection.sql(formatted_sql) - result_output = "No output" - if query_result is not None: - try: - results_as_python_objects = query_result.fetchall() - result_rows = [] - for row in results_as_python_objects: - if len(row) == 1: - result_rows.append(str(row[0])) - else: - result_rows.append(",".join(str(x) for x in row)) - - result_data = "\n".join(result_rows) - result_output = ",".join(query_result.columns) + "\n" + result_data - except AttributeError: - result_output = str(query_result) - - logger.debug(f"Query result: {result_output}") - return result_output - except duckdb.ProgrammingError as e: - return str(e) - except duckdb.Error as e: - return str(e) - except Exception as e: - return str(e) - - def show_tables(self) -> str: - """Function to show tables in the database - - :return: List of tables in the database - """ - stmt = "SHOW TABLES;" - tables = self.run_duckdb_query(stmt) - logger.debug(f"Tables: {tables}") - return tables - - def describe_table(self, table: str) -> str: - """Function to describe a table - - :param table: Table to describe - :return: Description of the table - """ - stmt = f"DESCRIBE {table};" - table_description = self.run_duckdb_query(stmt) - - logger.debug(f"Table description: {table_description}") - return f"{table}\n{table_description}" - - def summarize_table(self, table: str) -> str: - """Function to summarize the contents of a table - - :param table: Table to describe - :return: Description of the table - """ - stmt = f"SUMMARIZE SELECT * FROM {table};" - table_description = self.run_duckdb_query(stmt) - - logger.debug(f"Table description: {table_description}") - return f"{table}\n{table_description}" - - def inspect_query(self, query: str) -> str: - """Function to inspect a query and return the query plan. Always inspect your query before running them. - - :param query: Query to inspect - :return: Qeury plan - """ - stmt = f"explain {query};" - explain_plan = self.run_duckdb_query(stmt) - - logger.debug(f"Explain plan: {explain_plan}") - return explain_plan - - def describe_table_or_view(self, table: str): - """Function to describe a table or view - - :param table: Table or view to describe - :return: Description of the table or view - """ - stmt = f"select column_name, data_type from information_schema.columns where table_name='{table}';" - table_description = self.run_duckdb_query(stmt) - - logger.debug(f"Table description: {table_description}") - return f"{table}\n{table_description}" - - def load_local_path_to_table(self, path: str, table_name: Optional[str] = None) -> Tuple[str, str]: - """Load a local file into duckdb - - :param path: Path to load - :param table_name: Optional table name to use - :return: Table name, SQL statement used to load the file - """ - import os - - logger.debug(f"Loading {path} into duckdb") - - if table_name is None: - # Get the file name from the s3 path - file_name = path.split("/")[-1] - # Get the file name without extension from the s3 path - table_name, extension = os.path.splitext(file_name) - # If the table_name isn't a valid SQL identifier, we'll need to use something else - table_name = table_name.replace("-", "_").replace(".", "_").replace(" ", "_").replace("/", "_") - - create_statement = f"CREATE OR REPLACE TABLE '{table_name}' AS SELECT * FROM '{path}';" - self.run_duckdb_query(create_statement) - - logger.debug(f"Loaded {path} into duckdb as {table_name}") - # self.run_duckdb_query(f"SELECT * from {table_name};") - return table_name, create_statement - - def load_local_csv_to_table( - self, path: str, table_name: Optional[str] = None, delimiter: Optional[str] = None - ) -> Tuple[str, str]: - """Load a local CSV file into duckdb - - :param path: Path to load - :param table_name: Optional table name to use - :param delimiter: Optional delimiter to use - :return: Table name, SQL statement used to load the file - """ - import os - - logger.debug(f"Loading {path} into duckdb") - - if table_name is None: - # Get the file name from the s3 path - file_name = path.split("/")[-1] - # Get the file name without extension from the s3 path - table_name, extension = os.path.splitext(file_name) - # If the table_name isn't a valid SQL identifier, we'll need to use something else - table_name = table_name.replace("-", "_").replace(".", "_").replace(" ", "_").replace("/", "_") - - select_statement = f"SELECT * FROM read_csv('{path}'" - if delimiter is not None: - select_statement += f", delim='{delimiter}')" - else: - select_statement += ")" - - create_statement = f"CREATE OR REPLACE TABLE '{table_name}' AS {select_statement};" - self.run_duckdb_query(create_statement) - - logger.debug(f"Loaded CSV {path} into duckdb as {table_name}") - # self.run_duckdb_query(f"SELECT * from {table_name};") - return table_name, create_statement - - def load_s3_path_to_table(self, s3_path: str, table_name: Optional[str] = None) -> Tuple[str, str]: - """Load a file from S3 into duckdb - - :param s3_path: S3 path to load - :param table_name: Optional table name to use - :return: Table name, SQL statement used to load the file - """ - import os - - logger.debug(f"Loading {s3_path} into duckdb") - - if table_name is None: - # Get the file name from the s3 path - file_name = s3_path.split("/")[-1] - # Get the file name without extension from the s3 path - table_name, extension = os.path.splitext(file_name) - # If the table_name isn't a valid SQL identifier, we'll need to use something else - table_name = table_name.replace("-", "_").replace(".", "_").replace(" ", "_").replace("/", "_") - - create_statement = f"CREATE OR REPLACE TABLE '{table_name}' AS SELECT * FROM '{s3_path}';" - self.run_duckdb_query(create_statement) - - logger.debug(f"Loaded {s3_path} into duckdb as {table_name}") - # self.run_duckdb_query(f"SELECT * from {table_name};") - return table_name, create_statement - - def load_s3_csv_to_table( - self, s3_path: str, table_name: Optional[str] = None, delimiter: Optional[str] = None - ) -> Tuple[str, str]: - """Load a CSV file from S3 into duckdb - - :param s3_path: S3 path to load - :param table_name: Optional table name to use - :return: Table name, SQL statement used to load the file - """ - import os - - logger.debug(f"Loading {s3_path} into duckdb") - - if table_name is None: - # Get the file name from the s3 path - file_name = s3_path.split("/")[-1] - # Get the file name without extension from the s3 path - table_name, extension = os.path.splitext(file_name) - # If the table_name isn't a valid SQL identifier, we'll need to use something else - table_name = table_name.replace("-", "_").replace(".", "_").replace(" ", "_").replace("/", "_") - - select_statement = f"SELECT * FROM read_csv('{s3_path}'" - if delimiter is not None: - select_statement += f", delim='{delimiter}')" - else: - select_statement += ")" - - create_statement = f"CREATE OR REPLACE TABLE '{table_name}' AS {select_statement};" - self.run_duckdb_query(create_statement) - - logger.debug(f"Loaded CSV {s3_path} into duckdb as {table_name}") - # self.run_duckdb_query(f"SELECT * from {table_name};") - return table_name, create_statement - - def export_table_as(self, table_name: str, format: Optional[str] = "PARQUET", path: Optional[str] = None) -> str: - """Save a table to a desired format - The function will use the default format as parquet - If the path is provided, the table will be exported to that path, example s3 - - :param table_name: Table to export - :param format: Format to export to - :param path: Path to export to - :return: None - """ - if format is None: - format = "PARQUET" - - logger.debug(f"Exporting Table {table_name} as {format.upper()} in the path {path}") - # self.run_duckdb_query(f"SELECT * from {table_name};") - if path is None: - path = f"{table_name}.{format}" - else: - path = f"{path}/{table_name}.{format}" - export_statement = f"COPY (SELECT * FROM {table_name}) TO '{path}' (FORMAT {format.upper()});" - result = self.run_duckdb_query(export_statement) - logger.debug(f"Exported {table_name} to {path}/{table_name}") - - return result - - def create_fts_index(self, table_name: str, unique_key: str, input_values: list[str]) -> str: - """Create a full text search index on a table - - :param table_name: Table to create the index on - :param unique_key: Unique key to use - :param input_values: Values to index - :return: None - """ - logger.debug(f"Creating FTS index on {table_name} for {input_values}") - self.run_duckdb_query("INSTALL fts;") - logger.debug("Installed FTS extension") - self.run_duckdb_query("LOAD fts;") - logger.debug("Loaded FTS extension") - - create_fts_index_statement = f"PRAGMA create_fts_index('{table_name}', '{unique_key}', '{input_values}');" - logger.debug(f"Running {create_fts_index_statement}") - result = self.run_duckdb_query(create_fts_index_statement) - logger.debug(f"Created FTS index on {table_name} for {input_values}") - - return result - - def full_text_search(self, table_name: str, unique_key: str, search_text: str) -> str: - """Full text Search in a table column for a specific text/keyword - - :param table_name: Table to search - :param unique_key: Unique key to use - :param search_text: Text to search - :return: None - """ - logger.debug(f"Running full_text_search for {search_text} in {table_name}") - search_text_statement = f"""SELECT fts_main_corpus.match_bm25({unique_key}, '{search_text}') AS score,* - FROM {table_name} - WHERE score IS NOT NULL - ORDER BY score;""" - - logger.debug(f"Running {search_text_statement}") - result = self.run_duckdb_query(search_text_statement) - logger.debug(f"Search results for {search_text} in {table_name}") - - return result diff --git a/phi/llm/agent/email.py b/phi/llm/agent/email.py deleted file mode 100644 index 1df6a7201..000000000 --- a/phi/llm/agent/email.py +++ /dev/null @@ -1,59 +0,0 @@ -from typing import Optional - -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class EmailAgent(BaseAgent): - def __init__( - self, - receiver_email: Optional[str] = None, - sender_name: Optional[str] = None, - sender_email: Optional[str] = None, - sender_passkey: Optional[str] = None, - ): - super().__init__(name="email_agent") - self.receiver_email: Optional[str] = receiver_email - self.sender_name: Optional[str] = sender_name - self.sender_email: Optional[str] = sender_email - self.sender_passkey: Optional[str] = sender_passkey - self.register(self.email_user) - - def email_user(self, subject: str, body: str) -> str: - """Emails the user with the given subject and body. - - :param subject: The subject of the email. - :param body: The body of the email. - :return: "success" if the email was sent successfully, "error: [error message]" otherwise. - """ - try: - import smtplib - from email.message import EmailMessage - except ImportError: - logger.error("`smtplib` not installed") - raise - - if not self.receiver_email: - return "error: No receiver email provided" - if not self.sender_name: - return "error: No sender name provided" - if not self.sender_email: - return "error: No sender email provided" - if not self.sender_passkey: - return "error: No sender passkey provided" - - msg = EmailMessage() - msg["Subject"] = subject - msg["From"] = f"{self.sender_name} <{self.sender_email}>" - msg["To"] = self.receiver_email - msg.set_content(body) - - logger.info(f"Sending Email to {self.receiver_email}") - try: - with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: - smtp.login(self.sender_email, self.sender_passkey) - smtp.send_message(msg) - except Exception as e: - logger.error(f"Error sending email: {e}") - return f"error: {e}" - return "email sent successfully" diff --git a/phi/llm/agent/google.py b/phi/llm/agent/google.py deleted file mode 100644 index 79f3ca7f3..000000000 --- a/phi/llm/agent/google.py +++ /dev/null @@ -1,18 +0,0 @@ -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class GoogleAgent(BaseAgent): - def __init__(self): - super().__init__(name="google_agent") - self.register(self.get_result_from_google) - - def get_result_from_google(self, query: str) -> str: - """Gets the result for a query from Google. - Use this function to find an answer when not available in the knowledge base. - - :param query: The query to search for. - :return: The result from Google. - """ - logger.info(f"Searching google for: {query}") - return "Sorry, this capability is not available yet." diff --git a/phi/llm/agent/phi.py b/phi/llm/agent/phi.py deleted file mode 100644 index 872e4d6ea..000000000 --- a/phi/llm/agent/phi.py +++ /dev/null @@ -1,116 +0,0 @@ -import uuid -from typing import Optional - -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class PhiAgent(BaseAgent): - def __init__(self): - super().__init__(name="phi_agent") - self.register(self.create_new_app) - self.register(self.start_user_workspace) - self.register(self.validate_phi_is_ready) - - def validate_phi_is_ready(self) -> bool: - """Validates that Phi is ready to run commands. - - :return: True if Phi is ready, False otherwise. - """ - # Check if docker is running - return True - - def create_new_app(self, template: str, workspace_name: str) -> str: - """Creates a new phidata workspace for a given application template. - Use this function when the user wants to create a new "llm-app", "api-app", "django-app", or "streamlit-app". - Remember to provide a name for the new workspace. - You can use the format: "template-name" + name of an interesting person (lowercase, no spaces). - - :param template: (required) The template to use for the new application. - One of: llm-app, api-app, django-app, streamlit-app - :param workspace_name: (required) The name of the workspace to create for the new application. - :return: Status of the function or next steps. - """ - from phi.workspace.operator import create_workspace, TEMPLATE_TO_NAME_MAP, WorkspaceStarterTemplate - - ws_template: Optional[WorkspaceStarterTemplate] = None - if template.lower() in WorkspaceStarterTemplate.__members__.values(): - ws_template = WorkspaceStarterTemplate(template) - - if ws_template is None: - return f"Error: Invalid template: {template}, must be one of: llm-app, api-app, django-app, streamlit-app" - - ws_dir_name: Optional[str] = workspace_name - if ws_dir_name is None: - # Get default_ws_name from template - default_ws_name: Optional[str] = TEMPLATE_TO_NAME_MAP.get(ws_template) - # Add a 2 digit random suffix to the default_ws_name - random_suffix = str(uuid.uuid4())[:2] - default_ws_name = f"{default_ws_name}-{random_suffix}" - - return ( - f"Ask the user for a name for the app directory with the default value: {default_ws_name}." - f"Ask the user to input YES or NO to use the default value." - ) - # # Ask user for workspace name if not provided - # ws_dir_name = Prompt.ask("Please provide a name for the app", default=default_ws_name, console=console) - - logger.info(f"Creating: {template} at {ws_dir_name}") - try: - create_successful = create_workspace(name=ws_dir_name, template=ws_template.value) - if create_successful: - return ( - f"Successfully created a {ws_template.value} at {ws_dir_name}. " - f"Ask the user if they want to start the app now." - ) - else: - return f"Error: Failed to create {template}" - except Exception as e: - return f"Error: {e}" - - def start_user_workspace(self, workspace_name: Optional[str] = None) -> str: - """Starts the workspace for a user. Use this function when the user wants to start a given workspace. - If the workspace name is not provided, the function will start the active workspace. - Otherwise, it will start the workspace with the given name. - - :param workspace_name: The name of the workspace to start - :return: Status of the function or next steps. - """ - from phi.cli.config import PhiCliConfig - from phi.infra.type import InfraType - from phi.workspace.config import WorkspaceConfig - from phi.workspace.operator import start_workspace - - phi_config: Optional[PhiCliConfig] = PhiCliConfig.from_saved_config() - if not phi_config: - return "Error: Phi not initialized. Please run `phi ai` again" - - workspace_config_to_start: Optional[WorkspaceConfig] = None - active_ws_config: Optional[WorkspaceConfig] = phi_config.get_active_ws_config() - - if workspace_name is None: - if active_ws_config is None: - return "Error: No active workspace found. Please create a workspace first." - workspace_config_to_start = active_ws_config - else: - workspace_config_by_name: Optional[WorkspaceConfig] = phi_config.get_ws_config_by_dir_name(workspace_name) - if workspace_config_by_name is None: - return f"Error: Could not find a workspace with name: {workspace_name}" - workspace_config_to_start = workspace_config_by_name - - # Set the active workspace to the workspace to start - if active_ws_config is not None and active_ws_config.ws_root_path != workspace_config_by_name.ws_root_path: - phi_config.set_active_ws_dir(workspace_config_by_name.ws_root_path) - active_ws_config = workspace_config_by_name - - try: - start_workspace( - phi_config=phi_config, - ws_config=workspace_config_to_start, - target_env="dev", - target_infra=InfraType.docker, - auto_confirm=True, - ) - return f"Successfully started workspace: {workspace_config_to_start.ws_root_path.stem}" - except Exception as e: - return f"Error: {e}" diff --git a/phi/llm/agent/pubmed.py b/phi/llm/agent/pubmed.py deleted file mode 100644 index 15c04e97b..000000000 --- a/phi/llm/agent/pubmed.py +++ /dev/null @@ -1,19 +0,0 @@ -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class PubMedAgent(BaseAgent): - def __init__(self): - super().__init__(name="pubmed_agent") - self.register(self.get_articles_from_pubmed) - - def get_articles_from_pubmed(self, query: str, num_articles: int = 2) -> str: - """Gets the abstract for articles related to a query from PubMed: a database of biomedical literature - Use this function to find information about a medical concept when not available in the knowledge base or Google - - :param query: The query to get related articles for. - :param num_articles: The number of articles to return. - :return: JSON string containing the articles - """ - logger.info(f"Searching Pubmed for: {query}") - return "Sorry, this capability is not available yet." diff --git a/phi/llm/agent/shell.py b/phi/llm/agent/shell.py deleted file mode 100644 index 4cb4dcc29..000000000 --- a/phi/llm/agent/shell.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import List - -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class ShellAgent(BaseAgent): - def __init__(self): - super().__init__(name="shell_agent") - self.register(self.run_shell_command) - - def run_shell_command(self, args: List[str], tail: int = 100) -> str: - """Runs a shell command and returns the output or error. - - :param args: The command to run as a list of strings. - :param tail: The number of lines to return from the output. - :return: The output of the command. - """ - logger.info(f"Running shell command: {args}") - - import subprocess - - try: - result = subprocess.run(args, capture_output=True, text=True) - logger.debug(f"Result: {result}") - logger.debug(f"Return code: {result.returncode}") - if result.returncode != 0: - return f"Error: {result.stderr}" - - # return only the last n lines of the output - return "\n".join(result.stdout.split("\n")[-tail:]) - except Exception as e: - logger.warning(f"Failed to run shell command: {e}") - return f"Error: {e}" diff --git a/phi/llm/agent/website.py b/phi/llm/agent/website.py index 5620b0393..652a77ff7 100644 --- a/phi/llm/agent/website.py +++ b/phi/llm/agent/website.py @@ -1,50 +1,2 @@ -import json -from typing import List, Optional - -from phi.document import Document -from phi.knowledge.website import WebsiteKnowledgeBase -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class WebsiteAgent(BaseAgent): - def __init__(self, knowledge_base: Optional[WebsiteKnowledgeBase] = None): - super().__init__(name="website_agent") - self.knowledge_base: Optional[WebsiteKnowledgeBase] = knowledge_base - - if self.knowledge_base is not None and isinstance(self.knowledge_base, WebsiteKnowledgeBase): - self.register(self.add_website_to_knowledge_base) - else: - self.register(self.read_website) - - def add_website_to_knowledge_base(self, url: str) -> str: - """This function adds a websites content to the knowledge base. - NOTE: The website must start with https:// and should be a valid website. - - USE THIS FUNCTION TO GET INFORMATION ABOUT PRODUCTS FROM THE INTERNET. - - :param url: The url of the website to add. - :return: 'Success' if the website was added to the knowledge base. - """ - if self.knowledge_base is None: - return "Knowledge base not provided" - - logger.debug(f"Adding to knowledge base: {url}") - self.knowledge_base.urls.append(url) - logger.debug("Loading knowledge base.") - self.knowledge_base.load(recreate=False) - return "Success" - - def read_website(self, url: str) -> str: - """This function reads a website and returns the content. - - :param url: The url of the website to read. - :return: Relevant documents from the website. - """ - from phi.document.reader.website import WebsiteReader - - website = WebsiteReader() - - logger.debug(f"Reading website: {url}") - relevant_docs: List[Document] = website.read(url=url) - return json.dumps([doc.to_dict() for doc in relevant_docs]) +# DEPRECATED: Use phi.agent.website instead +from phi.agent.website import WebsiteAgent, WebsiteKnowledgeBase # noqa: F401 diff --git a/phi/llm/agent/wikipedia.py b/phi/llm/agent/wikipedia.py deleted file mode 100644 index ff2bb2377..000000000 --- a/phi/llm/agent/wikipedia.py +++ /dev/null @@ -1,54 +0,0 @@ -import json -from typing import List, Optional - -from phi.document import Document -from phi.knowledge.wikipedia import WikipediaKnowledgeBase -from phi.llm.agent.base import BaseAgent -from phi.utils.log import logger - - -class WikipediaAgent(BaseAgent): - def __init__(self, knowledge_base: Optional[WikipediaKnowledgeBase] = None): - super().__init__(name="wikipedia_agent") - self.knowledge_base: Optional[WikipediaKnowledgeBase] = knowledge_base - - if self.knowledge_base is not None and isinstance(self.knowledge_base, WikipediaKnowledgeBase): - self.register(self.search_wikipedia_and_update_knowledge_base) - else: - self.register(self.search_wikipedia) - - def search_wikipedia_and_update_knowledge_base(self, topic: str) -> str: - """This function searches wikipedia for a topic, adds the results to the knowledge base and returns them. - - USE THIS FUNCTION TO GET INFORMATION WHICH DOES NOT EXIST. - - :param topic: The topic to search Wikipedia and add to knowledge base. - :return: Relevant documents from Wikipedia knowledge base. - """ - - if self.knowledge_base is None: - return "Knowledge base not provided" - - logger.debug(f"Adding to knowledge base: {topic}") - self.knowledge_base.topics.append(topic) - logger.debug("Loading knowledge base.") - self.knowledge_base.load(recreate=False) - logger.debug(f"Searching knowledge base: {topic}") - relevant_docs: List[Document] = self.knowledge_base.search(query=topic) - return json.dumps([doc.to_dict() for doc in relevant_docs]) - - def search_wikipedia(self, query: str) -> str: - """Searches Wikipedia for a query. - - :param query: The query to search for. - :return: Relevant documents from wikipedia. - """ - try: - import wikipedia # noqa: F401 - except ImportError: - raise ImportError( - "The `wikipedia` package is not installed. " "Please install it via `pip install wikipedia`." - ) - - logger.info(f"Searching wikipedia for: {query}") - return json.dumps(Document(name=query, content=wikipedia.summary(query)).to_dict())