From c3707c585aa5115cb4622d13d6842520216bc3e2 Mon Sep 17 00:00:00 2001 From: deepakachu-aiplanet Date: Mon, 2 Sep 2024 02:59:39 +0530 Subject: [PATCH 01/14] added longterm memory feature --- pyproject.toml | 3 + src/openagi/actions/base.py | 3 +- src/openagi/actions/human_input.py | 4 +- src/openagi/agent.py | 133 +++++++++++++++++++++++-- src/openagi/cli.py | 31 ++++++ src/openagi/memory/base.py | 106 +++++++++++++++++++- src/openagi/memory/sessiondict.py | 17 ++++ src/openagi/planner/base.py | 5 +- src/openagi/planner/task_decomposer.py | 20 +++- src/openagi/prompts/ltm.py | 15 +++ src/openagi/prompts/task_creator.py | 45 +++++++-- src/openagi/storage/base.py | 8 +- 12 files changed, 360 insertions(+), 30 deletions(-) create mode 100644 src/openagi/cli.py create mode 100644 src/openagi/memory/sessiondict.py create mode 100644 src/openagi/prompts/ltm.py diff --git a/pyproject.toml b/pyproject.toml index 6541956..37ba6d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ description = "" authors = ["AI Planet "] readme = "README.md" include = ["src/*"] +packages = [{ include = "openagi", from = "src" }] # This tells Poetry where to find the package [tool.poetry.dependencies] python = "^3.9, <3.12" @@ -49,6 +50,8 @@ ruff = "^0.1.11" requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" +[tool.poetry.scripts] +openagi = "openagi.cli:main" [tool.ruff] ignore-init-module-imports = true diff --git a/src/openagi/actions/base.py b/src/openagi/actions/base.py index 31bf1b7..f7925ce 100644 --- a/src/openagi/actions/base.py +++ b/src/openagi/actions/base.py @@ -24,7 +24,8 @@ class BaseAction(BaseModel): ) def execute(self): - """Executes the action""" + """Executes the action + """ raise NotImplementedError("Subclasses must implement this method.") @classmethod diff --git a/src/openagi/actions/human_input.py b/src/openagi/actions/human_input.py index 512ebea..b3d3b4a 100644 --- a/src/openagi/actions/human_input.py +++ b/src/openagi/actions/human_input.py @@ -9,6 +9,6 @@ class HumanCLIInput(BaseAction): description="question to be asked to human", ) - def execute(self): - response = input(f"Agent: {self.ques_prompt}\nYou: ") + def execute(self, prompt=ques_prompt): + response = input(f"Agent: {prompt}\nYou: ") return response diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 11a4b59..499a972 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -1,8 +1,12 @@ import logging +import pprint from enum import Enum from textwrap import dedent -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union, Tuple + +from click import prompt from pydantic import BaseModel, Field, field_validator +from transformers.utils.hub import SESSION_ID from openagi.actions.base import BaseAction from openagi.actions.compressor import SummarizerAction @@ -23,12 +27,17 @@ from openagi.utils.helper import get_default_llm from openagi.utils.tool_list import get_tool_list from openagi.worker import Worker +from openagi.memory.sessiondict import SessionDict +from openagi.actions.human_input import HumanCLIInput +from openagi.prompts.ltm import LTMFormatPrompt class OutputFormat(str, Enum): markdown = "markdown" raw_text = "raw_text" +session = None + class Admin(BaseModel): planner: Optional[BasePlanner] = Field( description="Type of planner to use for task decomposition.", @@ -64,6 +73,9 @@ class Admin(BaseModel): description="Key to be used to store the output.", ) + input_action: Optional[HumanCLIInput] = Field(default_factory=HumanCLIInput, + description="To get feedback in case ltm has been enabled") + def model_post_init(self, __context: Any) -> None: model = super().model_post_init(__context) @@ -104,7 +116,7 @@ def assign_workers(self, workers: List[Worker]): else: self.workers.extend(workers) - def run_planner(self, query: str, descripton: str): + def run_planner(self, query: str, descripton: str, long_term_context: str): if self.planner: if not getattr(self.planner, "llm", False): setattr(self.planner, "llm", self.llm) @@ -120,10 +132,17 @@ def run_planner(self, query: str, descripton: str): workers_dict = [] for worker in self.workers: workers_dict.append(worker.worker_doc()) +######################################################################################################################## + for action in worker.actions: + actions_dict.append(action.cls_doc()) +######################################################################################################################## return self.planner.plan( query=query, description=descripton, +######################################################################################################################## + long_term_context=long_term_context, +######################################################################################################################## supported_actions=actions_dict, supported_workers=workers_dict, ) @@ -200,14 +219,14 @@ def _provoke_thought_obs(self, observation): thoughts = dedent(f"""Observation: {observation}""".strip()) return thoughts - def _should_continue(self, llm_resp: str) -> Union[bool, Optional[Dict]]: + def _should_continue(self, llm_resp: str) -> Tuple[bool, Optional[Dict]]: output: Dict = get_last_json(llm_resp, llm=self.llm, max_iterations=self.max_iterations) output_key_exists = bool(output and output.get(self.output_key)) return (not output_key_exists, output) def _force_output( self, llm_resp: str, all_thoughts_and_obs: List[str] - ) -> Union[bool, Optional[str]]: + ) -> Tuple[bool, Optional[str]]: """Force the output once the max iterations are reached.""" prompt = ( "\n".join(all_thoughts_and_obs) @@ -420,11 +439,72 @@ def single_agent_execution(self, query: str, description: str, task_lists: TaskL logging.debug(f"Execution Completed for Session ID - {self.memory.session_id}") return output + def run(self, query: str, description: str): logging.info("Running Admin Agent...") logging.info(f"SessionID - {self.memory.session_id}") - planned_tasks = self.run_planner(query=query, descripton=description) + """ + ltm_context = "" + bad_feedback = False + bad_session = None + if ltm enabled: + retrieve ltm + for sessions in ltm: + if any sim(session, query) > threshold + if no negative feedback: + ans = session.answer + take feedback + update session with new feedback ##UPDATION + return ans + else: + ltm_context += session + bad_feedback = True + bad_session = session + break + ltm_context += session + + planned_tasks = run_planner(query, description, ltm_contText) + + ans = execute(planned_tasks) + + if ltm enabled: + if bad_feedback: + update bad_session with new feedback ##UPDATION + else: + save session in ltm ##ADD + return ans + """ + ltm = ["None"] + bad_feedback = False + bad_session = None + if self.memory.long_term: + logging.info("Retrieving similar queries from long term memory...") + similar_sessions = self.memory.get_ltm(query) + ltm = [] + for memory in similar_sessions: + metadata = memory["metadata"] + print(memory["similarity_score"]) + if memory["similarity_score"] >= self.memory.ltm_threshold: + if metadata["ans_feedback"]=='' and metadata["plan_feedback"]=='': + logging.info("Found a very similar query in long term memory without negative feedback, returning answer directly") + result = memory["document"] + # ask for feedback here and UPDATE the response + # write for case when threshold is crossed but negative feedback + session = SessionDict.from_dict(metadata) + self.save_ltm("update", session) + return result + else: + ltm.append(LTMFormatPrompt().base_prompt.format(**metadata)) + bad_feedback = True + bad_session = SessionDict.from_dict(metadata) + break + ltm.append(LTMFormatPrompt().base_prompt.format(**metadata)) + + old_context = "\n\n".join(ltm) + + planned_tasks = self.run_planner(query=query, descripton=description, long_term_context=old_context) + logging.info("Tasks Planned...") logging.debug(f"{planned_tasks=}") @@ -434,20 +514,34 @@ def run(self, query: str, description: str): self.memory.save_planned_tasks(tasks=list(task_lists.tasks.queue)) if self.planner.autonomous: - return self.auto_workers_assignment( + result = self.auto_workers_assignment( query=query, description=description, task_lists=task_lists ) else: if self.workers: - return self.worker_task_execution( + result = self.worker_task_execution( query=query, description=description, task_lists=task_lists, ) else: - return self.single_agent_execution( + result = self.single_agent_execution( query=query, description=description, task_lists=task_lists ) + # Human feedback part + if self.memory.long_term: + if bad_feedback: + self.save_ltm("update", bad_session) + else: + session = SessionDict( + query=query, + description=description, + plan=str(planned_tasks), + session_id=self.memory.session_id, + answer=result + ) + self.save_ltm("add", session) + return result def _can_task_execute(self, llm_resp: str) -> Union[bool, Optional[str]]: content: str = find_last_r_failure_content(text=llm_resp) @@ -471,3 +565,26 @@ def get_supported_actions_for_worker(self, actions_list: List[str],tool_list: Li matching_classes.append(action) return matching_classes + + def save_ltm(self, type:str, session:SessionDict): + """ + Save a session to the long term memory, either by adding or by updating an existing session. + :param session: The SessionDict object that has all the details of the session + """ + session.plan_feedback = self.input_action.execute( + prompt=f"Please review the plan generated: \n{session.plan}\nIf you are satisfied " + f"with the plan, press enter, else please provide a detailed description of the " + f"problem, along with how the plan could have been better:") + + session.ans_feedback = self.input_action.execute( + prompt=f"Please review the answer generated: \n{session.answer}\nIf you are satisfied" + f"with the answer, press enter, else please provide a detailed description of" + f"the problem, along with how the answer could have been better:") + if type == "add": + self.memory.add_ltm(session) + logging.info(f"Session saved to long term memory: {session}") + elif type == "update": + self.memory.update_ltm(session) + logging.info(f"Session updated in long term memory: {session}") + else: + raise ValueError("Invalid type. Please provide either 'add' or 'update'.") diff --git a/src/openagi/cli.py b/src/openagi/cli.py new file mode 100644 index 0000000..7413783 --- /dev/null +++ b/src/openagi/cli.py @@ -0,0 +1,31 @@ +import argparse +import logging +import os +from openagi.memory.base import BaseMemory + + +def clear_long_term_memory(): + """Clears the long-term memory directory using environment variables.""" + long_term_dir = os.getenv("LONG_TERM_DIR", ".long_term_dir") + BaseMemory.clear_long_term_memory(long_term_dir) + + +def main(): + parser = argparse.ArgumentParser(description="OpenAGI CLI for various commands.") + + parser.add_argument( + "--clear-ltm", + action="store_true", + help="Clear the long-term memory directory." + ) + + args = parser.parse_args() + + if args.clear_ltm: + clear_long_term_memory() + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/src/openagi/memory/base.py b/src/openagi/memory/base.py index 3b1f6e5..b097618 100644 --- a/src/openagi/memory/base.py +++ b/src/openagi/memory/base.py @@ -1,13 +1,14 @@ import logging -from typing import Any, Dict +from typing import Any, Dict, List from uuid import uuid4 - +import os, shutil from pydantic import BaseModel, Field from openagi.storage.base import BaseStorage from openagi.storage.chroma import ChromaStorage from openagi.tasks.lists import TaskLists from openagi.tasks.task import Task +from openagi.memory.sessiondict import SessionDict class BaseMemory(BaseModel): @@ -17,11 +18,55 @@ class BaseMemory(BaseModel): description="Storage to be used for the Memory.", exclude=True, ) + ltm_storage: BaseStorage = Field( + default_factory=lambda: ChromaStorage, + description="Long-term storage to be used for the Memory.", + exclude=True, + ) + + long_term: bool = Field(default=False, description="Whether or not to use long term memory") + ltm_threshold: float = Field(default=0.7, + description="Semantic similarity threshold for long term memory instance retrieval") + + long_term_dir: str = Field(default=None, description="Path to directory for long-term memory storage") def __init__(self, **data: Any): super().__init__(**data) self.storage = ChromaStorage.from_kwargs(collection_name=self.session_id) + + # Set long_term_dir from environment variable if not provided + if self.long_term_dir is None: + self.long_term_dir = os.getenv("LONG_TERM_DIR", ".long_term_dir") + + if self.long_term: + os.makedirs(self.long_term_dir, exist_ok=True) + + self.ltm_storage = ChromaStorage.from_kwargs( + collection_name="long_term_memory", + persist_path=self.long_term_dir + ) + assert 1 >= self.ltm_threshold >= 0.7, "Semantic similarity threshold should be between 0.7 and 1" + logging.info(f"Session ID initialized: {self.session_id}") + if self.long_term: + logging.info(f"Long-term memory enabled. Using directory: {self.long_term_dir}") + + @staticmethod + def clear_long_term_memory(directory: str): + """Clears all data from the specified long-term memory directory.""" + if os.path.exists(directory): + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + logging.error(f'Failed to delete {file_path}. Reason: {e}') + logging.info(f"Cleared all data from the long-term memory directory: {directory}") + else: + logging.warning(f"The long-term memory directory does not exist: {directory}") def search(self, query: str, n_results: int = 10, **kwargs) -> Dict[str, Any]: """ @@ -101,3 +146,60 @@ def _create_metadata(self, task: Task) -> Dict[str, Any]: "task_description": task.description, "task_actions": task.actions, } + + def add_ltm(self, session : SessionDict): + """ + Add a session to the long term memory + :param session: The SessionDict object that has all the details of the session + :return: None + """ + self.ltm_storage.save_document( + id = session.session_id, + document= session.query, + metadata= session.model_dump() + ) + logging.info(f"Long term memory added for session : {session.session_id}") + + def update_ltm(self, session: SessionDict) -> None: + """ + Update an existing session in long-term memory. + + :param session: The SessionDict object containing updated details of the session. + :return: None + """ + self.ltm_storage.update_document( + id=session.session_id, + document=session.query, + metadata=session.model_dump() + ) + + logging.info(f"Long-term memory updated for session: {session.session_id}") + + def get_ltm(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]: + """ + Retrieve and return the long-term memory based on a query. + + :param query: The query string to search for. + :param n_results: The number of results to return. + :return: A dictionary of search results. + """ + query_data = { + "query_texts": query, + "n_results": n_results, + "include": ["metadatas", "documents", "distances"], + } + response = self.ltm_storage.query_documents(**query_data) + results = [] + # if "documents" in response and "distances" in response: + for doc, metadata, distance in zip(response["documents"][0], response["metadatas"][0], response["distances"][0]): + results.append({ + "document": doc, + "metadata": metadata, + "similarity_score": 1 - distance + }) + if results: + logging.info(f"Retrieved long-term memory for query: {query}\n{results[0]['document'][:250]}") + return results + + logging.info(f"No documents found for query: {query}") + return results \ No newline at end of file diff --git a/src/openagi/memory/sessiondict.py b/src/openagi/memory/sessiondict.py new file mode 100644 index 0000000..51e141f --- /dev/null +++ b/src/openagi/memory/sessiondict.py @@ -0,0 +1,17 @@ +from pydantic import BaseModel + +class SessionDict(BaseModel): + session_id: str + query: str + description: str + answer: str + plan: str + plan_feedback: str = "NA" + ans_feedback: str = "NA" + + @classmethod + def from_dict(cls, input_dict: dict): + """Class method to initialize an instance from a dictionary.""" + return cls(**input_dict) + + diff --git a/src/openagi/planner/base.py b/src/openagi/planner/base.py index 8bda69b..69333d9 100644 --- a/src/openagi/planner/base.py +++ b/src/openagi/planner/base.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Dict, Optional, List from pydantic import BaseModel, Field @@ -23,5 +23,6 @@ def human_clarification(self, response: str) -> bool: """Whether to Ask clarifying questions""" raise NotImplementedError("Subclasses must implement this method.") - def plan(self, query: str, description: str) -> Dict: + def plan(self, query: str, description: str, long_term_context: str, supported_actions: List[BaseAction],*args, + **kwargs,) -> Dict: raise NotImplementedError("Subclasses must implement this method.") diff --git a/src/openagi/planner/task_decomposer.py b/src/openagi/planner/task_decomposer.py index 7ce3eca..dcf52e4 100644 --- a/src/openagi/planner/task_decomposer.py +++ b/src/openagi/planner/task_decomposer.py @@ -3,6 +3,8 @@ import re from typing import Dict, List, Optional, Union +from click import prompt +from numpy.f2py.crackfortran import previous_context from pydantic import Field from openagi.actions.base import BaseAction @@ -22,8 +24,8 @@ class TaskPlanner(BasePlanner): human_intervene: bool = Field( default=False, description="If human internvention is required or not." ) - input_action: Optional[BaseAction] = Field( - default=HumanCLIInput, + input_action: Optional[HumanCLIInput] = Field( + default_factory=HumanCLIInput, description="If `human_intervene` is enabled, which action to be performed.", ) prompt: Optional[BasePrompt] = Field( @@ -51,7 +53,7 @@ def get_prompt(self) -> None: logging.info(f"Using prompt: {self.prompt.__class__.__name__}") return self.prompt """ - def get_prompt(self) -> None: + def get_prompt(self) -> BasePrompt: if not self.prompt: if self.autonomous: self.prompt = AutoTaskCreator() @@ -112,8 +114,10 @@ def human_clarification(self, planner_vars) -> Dict: if not question: return planner_vars - - human_input = self.input_action(ques_prompt=question).execute() + + # set the ques_prompt to question in input_action + # self.input_action.ques_prompt = question + human_input = self.input_action.execute(prompt=question) planner_vars["objective"] += f" {human_input}" # Update chat history @@ -156,6 +160,7 @@ def plan( self, query: str, description: str, + long_term_context : str, supported_actions: List[Dict], *args, **kwargs, @@ -172,11 +177,16 @@ def plan( Returns: Dict: A dictionary containing the planned tasks. + :param supported_actions: + :param query: + :param description: + :param long_term_context: """ planner_vars = dict( objective=query, task_descriptions=description, supported_actions=supported_actions, + previous_context=long_term_context, *args, **kwargs, ) diff --git a/src/openagi/prompts/ltm.py b/src/openagi/prompts/ltm.py new file mode 100644 index 0000000..064b669 --- /dev/null +++ b/src/openagi/prompts/ltm.py @@ -0,0 +1,15 @@ +from textwrap import dedent +from openagi.prompts.base import BasePrompt + +ltm_prompt = dedent(""" +Previously asked query: {query} +Previously given description: {description} +Previously constructed plan: {plan} +Feedback on the plan by human user: {plan_feedback} +Previously generated answer: {answer} +Feedback on the answer by human user: {ans_feedback} +""".strip()) + + +class LTMFormatPrompt(BasePrompt): + base_prompt: str = ltm_prompt \ No newline at end of file diff --git a/src/openagi/prompts/task_creator.py b/src/openagi/prompts/task_creator.py index e0bbabc..d971e03 100644 --- a/src/openagi/prompts/task_creator.py +++ b/src/openagi/prompts/task_creator.py @@ -1,7 +1,7 @@ from textwrap import dedent from openagi.prompts.base import BasePrompt -single_agent_task_creation = """ +single_agent_task_creation = dedent(""" You are a task-creator AI for OpenAGI. Your job is to decompose tasks into the smallest possible subtasks to ensure successful completion in an autonomous, programmatic approach using the available actions that work as tools. Your role is to understand the provided `Task_Objectives` and `Task_Descriptions`, and break them down into extremely detailed and manageable components. Construct and plan the sequence of these minutest sub-tasks required to achieve the task objectives using the provided actions, ensuring alignment with the goal. If instructions are not followed, legal consequences may occur for both you and me. **Requirements:** @@ -10,11 +10,16 @@ - Only One Action per task. Ensure tasks are decomposed in the same manner. - Please ensure each task meets the criteria above and refine as necessary to maintain clarity and alignment with the overall objectives. - If no tasks, return the reasons why no tasks can be created. + - Consider the previous context provided when creating tasks, using relevant information to improve task planning and execution. + - Carefully review the feedback from previous interactions and ensure that past mistakes are not repeated. + - Incorporate lessons learned from previous attempts to improve the current task creation process. **Inputs:** - Task_Objectives: {objective} - Task_Descriptions: {task_descriptions} - SUPPORTED_ACTIONS: {supported_actions} + - Previous_Context: {previous_context} + - This includes a 'feedback' field containing user comments on previous task executions. **Output Format:** Return the tasks in JSON format with the keys "task_name" and "description". Ensure the JSON format is suitable for utilization with JSON.parse(), enclosed in triple backticks. @@ -29,29 +34,38 @@ **Notes:** - You do not need to create tasks for storing the results, as results will be stored automatically after executing each task. You can retrieve previous task results using MemoryRagAction. + - Utilize the Previous_Context to inform your task creation, avoiding redundant work and leveraging past experiences. + - Pay special attention to the 'feedback' field in the Previous_Context. Use this information to avoid repeating past mistakes and to improve the quality of your task creation. **Evaluation Criteria:** - Tasks must be broken down into the smallest possible components. - Each task must be clear and executable by an AI agent. - Tasks must follow a logical sequence to achieve the overall objective. - Ensure alignment with the provided actions and goals. -""" + - Effectively incorporate relevant information from the Previous_Context, especially the feedback. + - Demonstrate clear improvements based on past feedback and avoid repeating previous mistakes. +""".strip()) worker_task_creation = dedent( """ -You are a task-creator AI for OpenAGI. Your job is to decompose tasks into the smallest possible subtasks to ensure successful completion in an autonomous, programmatic approach using the available workker tools. Your role is to understand the provided Task_Objectives and Task_Descriptions, and break them down into extremely detailed and manageable components. Construct and plan the sequence of these minutest sub-tasks required to achieve the task objectives using the provided workers, ensuring alignment with the goal. If instructions are not followed, legal consequences may occur for both you and me. +You are a task-creator AI for OpenAGI. Your job is to decompose tasks into the smallest possible subtasks to ensure successful completion in an autonomous, programmatic approach using the available worker tools. Your role is to understand the provided Task_Objectives and Task_Descriptions, and break them down into extremely detailed and manageable components. Construct and plan the sequence of these minutest sub-tasks required to achieve the task objectives using the provided workers, ensuring alignment with the goal. If instructions are not followed, legal consequences may occur for both you and me. **Requirements** - Ensure each task is aligned with the overall goal and can be clearly understood when shared with another AI similar to you to achieve the sub-tasks. Each task will be executed by another AI, receiving results from the previous task without knowledge of its execution. - Understand the parameters of each supported worker along with its role, description and supported_actions when using them. - Use only one worker per task. Ensure tasks are decomposed similarly. - Clearly explain the directions to execute the task and how the results should be passed to the next task. +- Consider the previous context provided when creating tasks, using relevant information to improve task planning and execution. +- Carefully review the feedback from previous interactions and ensure that past mistakes are not repeated. +- Incorporate lessons learned from previous attempts to improve the current task creation and worker assignment process. **Inputs** - Task_Objectives: {objective} - Task_Descriptions: {task_descriptions} - Supported_Workers: {supported_workers} +- Previous_Context: {previous_context} + - This includes a 'feedback' field containing user comments on previous task executions. **Output Format** Return the tasks in JSON format with the keys "task_name", "description", and "worker_id" and nothing else. Ensure the JSON format is suitable for utilization with `JSON.parse()`, enclosed in triple backticks. @@ -67,6 +81,8 @@ **Notes** - You do not need to create tasks for storing the results, as results will be stored automatically after executing each task. You can retrieve previous task results using MemoryRagAction. +- Utilize the Previous_Context to inform your task creation, avoiding redundant work and leveraging past experiences. +- Pay special attention to the 'feedback' field in the Previous_Context. Use this information to avoid repeating past mistakes and to improve the quality of your task creation and worker assignment. **Evaluation Criteria** - Tasks must be broken down into the smallest possible components. @@ -74,15 +90,18 @@ - Tasks must follow a logical and practical sequence to achieve the overall objective. - Ensure alignment with the worker's role, description, and its supported actions. - If human input is required to curate the task, include the delimiters `` and `` to request human input. If not, ignore this step. +- Effectively incorporate relevant information from the Previous_Context, especially the feedback. +- Demonstrate clear improvements based on past feedback and avoid repeating previous mistakes. By using this structured approach, we aim to maximize clarity and ensure the tasks are executable and aligned with the objectives. **Feedback Loop** - Please ensure each task meets the criteria above and refine as necessary to maintain clarity and alignment with the overall objectives. +- Continuously improve based on the feedback provided in the Previous_Context. """.strip() ) -auto_task_creator = """ +auto_task_creator = dedent(""" You are TaskMaster, an advanced AI specializing in ultra-precise task decomposition and worker assignment for OpenAGI. Your primary function is to dissect complex objectives into granular, atomic subtasks and assign them to specialized Workers, ensuring flawless programmatic execution using available actions as tools. Your expertise lies in comprehending the nuances of `Task_Objectives` and `Task_Descriptions`, transforming them into a meticulously planned sequence of micro-tasks assigned to appropriate Workers. Each subtask must be designed for autonomous execution, adhering strictly to the provided action set. Failure to comply may result in severe consequences. @@ -91,8 +110,10 @@ 1. Atomic Task Decomposition: Break down tasks to their most fundamental, indivisible units. 2. Action Alignment: Each micro-task must correspond to exactly one supported action. 3. Sequential Logic: Ensure a clear, logical progression from one micro-task to the next. -4. Worker Specialization: Assign tasks to Workers based on their expertise and the required actions. Be clever to not assign more workers, for revelant task one worker should do +4. Worker Specialization: Assign tasks to Workers based on their expertise and the required actions. Be clever to not assign more workers, for relevant task one worker should do. 5. Goal Orientation: Every micro-task must directly contribute to the overarching objective. +6. Context Utilization: Leverage the provided previous context to inform task creation and worker assignment. +7. Feedback Integration: Carefully review and incorporate user feedback from previous interactions to avoid repeating past mistakes and improve overall performance. **Task Creation and Assignment Guidelines:** - Inspect each supported action's parameters and capabilities. @@ -101,11 +122,16 @@ - Utilize MemoryRagAction for accessing results from previous tasks when necessary. - Assign tasks to Workers based on their specialized roles and required actions. - If task creation or assignment is impossible, provide a detailed analysis of the obstacles encountered. +- Use the previous context to avoid redundant work and improve task efficiency. +- Analyze the feedback provided in the previous context to refine your approach and avoid repeating past mistakes. +- Demonstrate clear improvements in task creation and worker assignment based on past feedback. **Input Parameters:** - Task_Objectives: {objective} - Task_Descriptions: {task_descriptions} - SUPPORTED_ACTIONS: {supported_actions} +- Previous_Context: {previous_context} + - This includes a 'feedback' field containing user comments on previous task executions. **Output Specification:** Generate a JSON-parseable array of Workers and their assigned tasks, each containing "worker_name", "role", "instruction", "task_id", "description", and "supported_actions" keys. Enclose the output in triple backticks. @@ -127,7 +153,14 @@ } ] ``` -""" + +**Evaluation Criteria:** +- Tasks are broken down to their most atomic level, each corresponding to a single action. +- Worker assignments are optimized based on their specializations and the task requirements. +- The task sequence demonstrates a clear, logical progression towards the overall objective. +- Previous context, especially user feedback, is effectively incorporated to improve task creation and avoid past mistakes. +- There's clear evidence of learning and improvement based on past interactions and feedback. +""".strip()) #task_id => TaskList . Not Generating via LLM. class SingleAgentTaskCreator(BasePrompt): diff --git a/src/openagi/storage/base.py b/src/openagi/storage/base.py index 54bad8e..e3b8688 100644 --- a/src/openagi/storage/base.py +++ b/src/openagi/storage/base.py @@ -8,17 +8,17 @@ class BaseStorage(BaseModel): name: str = Field(title="BaseStorage", description="Name of the Storage.") - def save_document(self): + def save_document(self, id, document, metadata): """Save documents to the with metadata.""" raise NotImplementedError("Subclasses must implement this method.") - def update_document(self): + def update_document(self, id, document, metadata): raise NotImplementedError("Subclasses must implement this method.") - def delete_document(self): + def delete_document(self, id): raise NotImplementedError("Subclasses must implement this method.") - def query_documents(self): + def query_documents(self, **kwargs): raise NotImplementedError("Subclasses must implement this method.") @classmethod From b18cf020cabdebc9ce1d4b7f9b63aa843668749e Mon Sep 17 00:00:00 2001 From: deepakachu-aiplanet Date: Mon, 2 Sep 2024 03:24:10 +0530 Subject: [PATCH 02/14] some changes in retrieval, relaxed threhold to 0.6 by default --- src/openagi/agent.py | 5 ++++- src/openagi/memory/base.py | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 499a972..aaa0ed6 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -499,7 +499,10 @@ def run(self, query: str, description: str): bad_feedback = True bad_session = SessionDict.from_dict(metadata) break - ltm.append(LTMFormatPrompt().base_prompt.format(**metadata)) + # ltm.append(LTMFormatPrompt().base_prompt.format(**metadata)) + # the above is commented because i think it is better to have a threshold on what gets retrieved + # instead of relying on top k. This way we only retrieve one session though, but it should be a + # good session. old_context = "\n\n".join(ltm) diff --git a/src/openagi/memory/base.py b/src/openagi/memory/base.py index b097618..9093d01 100644 --- a/src/openagi/memory/base.py +++ b/src/openagi/memory/base.py @@ -25,7 +25,7 @@ class BaseMemory(BaseModel): ) long_term: bool = Field(default=False, description="Whether or not to use long term memory") - ltm_threshold: float = Field(default=0.7, + ltm_threshold: float = Field(default=0.6, description="Semantic similarity threshold for long term memory instance retrieval") long_term_dir: str = Field(default=None, description="Path to directory for long-term memory storage") @@ -45,7 +45,7 @@ def __init__(self, **data: Any): collection_name="long_term_memory", persist_path=self.long_term_dir ) - assert 1 >= self.ltm_threshold >= 0.7, "Semantic similarity threshold should be between 0.7 and 1" + assert 1 >= self.ltm_threshold >= 0.6, "Semantic similarity threshold should be between 0.6 and 1" logging.info(f"Session ID initialized: {self.session_id}") if self.long_term: From f32c846aea398a57a1c30d0713c3a19d06779ba9 Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Mon, 9 Sep 2024 03:04:39 +0530 Subject: [PATCH 03/14] ignore this commit --- src/openagi/memory/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/openagi/memory/base.py b/src/openagi/memory/base.py index 9093d01..e322247 100644 --- a/src/openagi/memory/base.py +++ b/src/openagi/memory/base.py @@ -202,4 +202,5 @@ def get_ltm(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]: return results logging.info(f"No documents found for query: {query}") - return results \ No newline at end of file + return results +# \ No newline at end of file From 5a76cc07ee1fe537375eb004906cbb483be2c2f7 Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Wed, 11 Sep 2024 01:45:33 +0530 Subject: [PATCH 04/14] made the feedback input mechanism more robust --- src/openagi/agent.py | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index aaa0ed6..3feafca 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -569,25 +569,35 @@ def get_supported_actions_for_worker(self, actions_list: List[str],tool_list: Li return matching_classes - def save_ltm(self, type:str, session:SessionDict): + def save_ltm(self, action_type: str, session: SessionDict): """ - Save a session to the long term memory, either by adding or by updating an existing session. - :param session: The SessionDict object that has all the details of the session + Save a session to long-term memory by either adding or updating an existing session. + + :param action_type: Type of operation: 'add' or 'update' + :param session: The SessionDict object containing session details """ + # Get feedback for plan and answer session.plan_feedback = self.input_action.execute( - prompt=f"Please review the plan generated: \n{session.plan}\nIf you are satisfied " - f"with the plan, press enter, else please provide a detailed description of the " - f"problem, along with how the plan could have been better:") + prompt=( + f"Review the generated plan: \n{session.plan}\n" + "If satisfied, press ENTER. \nOtherwise, describe the issue and suggest improvements:" + ) + ).strip() session.ans_feedback = self.input_action.execute( - prompt=f"Please review the answer generated: \n{session.answer}\nIf you are satisfied" - f"with the answer, press enter, else please provide a detailed description of" - f"the problem, along with how the answer could have been better:") - if type == "add": + prompt=( + f"Review the generated answer: \n{session.answer}\n" + "If satisfied, press ENTER. \nOtherwise, describe the issue and suggest improvements:" + ) + ).strip() + + # Save or update based on the action_type + if action_type == "add": self.memory.add_ltm(session) - logging.info(f"Session saved to long term memory: {session}") - elif type == "update": + logging.info(f"Session added to long-term memory: {session}") + elif action_type == "update": self.memory.update_ltm(session) - logging.info(f"Session updated in long term memory: {session}") + logging.info(f"Session updated in long-term memory: {session}") else: - raise ValueError("Invalid type. Please provide either 'add' or 'update'.") + raise ValueError("Invalid action_type. Use 'add' or 'update'.") + From d599b84e0d845c175853cc9400fb44def69bd52d Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Wed, 11 Sep 2024 02:07:09 +0530 Subject: [PATCH 05/14] resolved the issue to have a hidden directory --- src/openagi/memory/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/openagi/memory/base.py b/src/openagi/memory/base.py index e322247..862720d 100644 --- a/src/openagi/memory/base.py +++ b/src/openagi/memory/base.py @@ -34,10 +34,15 @@ def __init__(self, **data: Any): super().__init__(**data) self.storage = ChromaStorage.from_kwargs(collection_name=self.session_id) - # Set long_term_dir from environment variable if not provided + # Setting the long_term_dir from environment variable if not provided if self.long_term_dir is None: self.long_term_dir = os.getenv("LONG_TERM_DIR", ".long_term_dir") + # Ensuring the directory is hidden by prefixing with a dot if necessary + if not os.path.basename(self.long_term_dir).startswith('.'): + self.long_term_dir = os.path.join(os.path.dirname(self.long_term_dir), + f".{os.path.basename(self.long_term_dir)}") + if self.long_term: os.makedirs(self.long_term_dir, exist_ok=True) From 650b838d224b1b720e71b2e533e5f7972082b9a7 Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Wed, 11 Sep 2024 02:26:27 +0530 Subject: [PATCH 06/14] resolved merge conflict --- src/openagi/agent.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 6778531..84cefd3 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -444,6 +444,9 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = logging.info("Running Admin Agent...") logging.info(f"SessionID - {self.memory.session_id}") + if self.memory.long_term and planned_tasks: + logging.warning("Long Term Memory is not applicable for user given plan.") + """ ltm_context = "" bad_feedback = False @@ -504,7 +507,8 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = # instead of relying on top k. This way we only retrieve one session though, but it should be a # good session. - old_context = "\n\n".join(ltm) + old_context = "\n\n".join(ltm) + if not planned_tasks: planned_tasks = self.run_planner(query=query, descripton=description, long_term_context=old_context) From 018ac05ec30ae98ddfa66b7db8bd3dbfa1bb1622 Mon Sep 17 00:00:00 2001 From: deepakachu-aiplanet Date: Thu, 12 Sep 2024 01:11:58 +0530 Subject: [PATCH 07/14] resolving review comments --- src/openagi/agent.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 84cefd3..0ccfea6 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -74,7 +74,7 @@ class Admin(BaseModel): ) input_action: Optional[HumanCLIInput] = Field(default_factory=HumanCLIInput, - description="To get feedback in case ltm has been enabled") + description="To get feedback in case long term memory has been enabled") def model_post_init(self, __context: Any) -> None: model = super().model_post_init(__context) @@ -132,17 +132,13 @@ def run_planner(self, query: str, descripton: str, long_term_context: str): workers_dict = [] for worker in self.workers: workers_dict.append(worker.worker_doc()) -######################################################################################################################## for action in worker.actions: actions_dict.append(action.cls_doc()) -######################################################################################################################## return self.planner.plan( query=query, description=descripton, -######################################################################################################################## long_term_context=long_term_context, -######################################################################################################################## supported_actions=actions_dict, supported_workers=workers_dict, ) From 24ac460cab60b72c2ffb1c47f8623b6d1a380acd Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Thu, 12 Sep 2024 01:20:15 +0530 Subject: [PATCH 08/14] PR review comment changes --- src/openagi/agent.py | 34 +------------------------- src/openagi/planner/task_decomposer.py | 2 -- 2 files changed, 1 insertion(+), 35 deletions(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 0ccfea6..5fb5835 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -443,37 +443,6 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = if self.memory.long_term and planned_tasks: logging.warning("Long Term Memory is not applicable for user given plan.") - """ - ltm_context = "" - bad_feedback = False - bad_session = None - if ltm enabled: - retrieve ltm - for sessions in ltm: - if any sim(session, query) > threshold - if no negative feedback: - ans = session.answer - take feedback - update session with new feedback ##UPDATION - return ans - else: - ltm_context += session - bad_feedback = True - bad_session = session - break - ltm_context += session - - planned_tasks = run_planner(query, description, ltm_contText) - - ans = execute(planned_tasks) - - if ltm enabled: - if bad_feedback: - update bad_session with new feedback ##UPDATION - else: - save session in ltm ##ADD - return ans - """ ltm = ["None"] bad_feedback = False bad_session = None @@ -483,10 +452,9 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = ltm = [] for memory in similar_sessions: metadata = memory["metadata"] - print(memory["similarity_score"]) if memory["similarity_score"] >= self.memory.ltm_threshold: if metadata["ans_feedback"]=='' and metadata["plan_feedback"]=='': - logging.info("Found a very similar query in long term memory without negative feedback, returning answer directly") + logging.info(f"Found a very similar query (similarity = {memory['similarity_score']} in long term memory without negative feedback, returning answer directly") result = memory["document"] # ask for feedback here and UPDATE the response # write for case when threshold is crossed but negative feedback diff --git a/src/openagi/planner/task_decomposer.py b/src/openagi/planner/task_decomposer.py index dcf52e4..f859809 100644 --- a/src/openagi/planner/task_decomposer.py +++ b/src/openagi/planner/task_decomposer.py @@ -3,8 +3,6 @@ import re from typing import Dict, List, Optional, Union -from click import prompt -from numpy.f2py.crackfortran import previous_context from pydantic import Field from openagi.actions.base import BaseAction From 39ab86e6100f288264c73876ef72927915bab7ca Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Thu, 12 Sep 2024 01:31:07 +0530 Subject: [PATCH 09/14] PR review comment changes --- src/openagi/memory/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/openagi/memory/base.py b/src/openagi/memory/base.py index 862720d..e78198e 100644 --- a/src/openagi/memory/base.py +++ b/src/openagi/memory/base.py @@ -208,4 +208,3 @@ def get_ltm(self, query: str, n_results: int = 3) -> List[Dict[str, Any]]: logging.info(f"No documents found for query: {query}") return results -# \ No newline at end of file From 5794c98f0032b71d471e34239b73fad019ca632d Mon Sep 17 00:00:00 2001 From: lucifertrj Date: Fri, 13 Sep 2024 00:17:13 +0530 Subject: [PATCH 10/14] remove transformers dependency --- src/benchmark.py => benchmark.py | 0 src/openagi/agent.py | 5 ----- 2 files changed, 5 deletions(-) rename src/benchmark.py => benchmark.py (100%) diff --git a/src/benchmark.py b/benchmark.py similarity index 100% rename from src/benchmark.py rename to benchmark.py diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 5fb5835..97665fe 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -1,12 +1,8 @@ import logging -import pprint from enum import Enum from textwrap import dedent from typing import Any, Dict, List, Optional, Union, Tuple - -from click import prompt from pydantic import BaseModel, Field, field_validator -from transformers.utils.hub import SESSION_ID from openagi.actions.base import BaseAction from openagi.actions.compressor import SummarizerAction @@ -35,7 +31,6 @@ class OutputFormat(str, Enum): markdown = "markdown" raw_text = "raw_text" - session = None class Admin(BaseModel): From 55379f9bdee4d9a99d88a324baf5a6cac36a5d10 Mon Sep 17 00:00:00 2001 From: lucifertrj Date: Fri, 13 Sep 2024 00:17:38 +0530 Subject: [PATCH 11/14] 0.2.9.6 release for ltm --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 99d89f0..de6bb3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "openagi" -version = "0.2.9.5" +version = "0.2.9.6" description = "" authors = ["AI Planet "] readme = "README.md" From 2fe0f5aed040cb1eb3fed091eec87524c8508a22 Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Fri, 13 Sep 2024 20:51:15 +0530 Subject: [PATCH 12/14] fixed the updation bug --- src/openagi/agent.py | 4 ++-- src/openagi/llms/mistral.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 5fb5835..18b6a5b 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -4,9 +4,7 @@ from textwrap import dedent from typing import Any, Dict, List, Optional, Union, Tuple -from click import prompt from pydantic import BaseModel, Field, field_validator -from transformers.utils.hub import SESSION_ID from openagi.actions.base import BaseAction from openagi.actions.compressor import SummarizerAction @@ -501,6 +499,8 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = # Human feedback part if self.memory.long_term: if bad_feedback: + bad_session.answer = result + bad_session.plan = str(planned_tasks) self.save_ltm("update", bad_session) else: session = SessionDict( diff --git a/src/openagi/llms/mistral.py b/src/openagi/llms/mistral.py index 6038ff8..9c01d94 100644 --- a/src/openagi/llms/mistral.py +++ b/src/openagi/llms/mistral.py @@ -1,3 +1,5 @@ +import time + from openagi.exception import OpenAGIException from openagi.llms.base import LLMBaseModel, LLMConfigModel from openagi.utils.yamlParse import read_from_env @@ -52,6 +54,7 @@ def run(self, input_text: str): raise ValueError("`llm` attribute not set.") message = HumanMessage(content=input_text) resp = self.llm([message]) + time.sleep(1) return resp.content @staticmethod From 61f56629ac89db5a17dcaa5dbd9765fce0950e0f Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Fri, 13 Sep 2024 20:56:58 +0530 Subject: [PATCH 13/14] resolved updation bug --- src/openagi/agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/openagi/agent.py b/src/openagi/agent.py index 16737f3..613ae80 100644 --- a/src/openagi/agent.py +++ b/src/openagi/agent.py @@ -498,6 +498,8 @@ def run(self, query: str, description: str,planned_tasks: Optional[List[Dict]] = # Human feedback part if self.memory.long_term: if bad_feedback: + bad_session.plan = str(planned_tasks) + bad_session.answer = result self.save_ltm("update", bad_session) else: session = SessionDict( From 15d544b4686a3a70154a0e2198e4156ca871d8b4 Mon Sep 17 00:00:00 2001 From: deepakachu5114 Date: Sun, 15 Sep 2024 21:44:13 +0530 Subject: [PATCH 14/14] removed sleep from mistral, I had added sleep statement locally to not cross rate limits --- src/openagi/llms/mistral.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/openagi/llms/mistral.py b/src/openagi/llms/mistral.py index 9c01d94..7864c5a 100644 --- a/src/openagi/llms/mistral.py +++ b/src/openagi/llms/mistral.py @@ -54,7 +54,6 @@ def run(self, input_text: str): raise ValueError("`llm` attribute not set.") message = HumanMessage(content=input_text) resp = self.llm([message]) - time.sleep(1) return resp.content @staticmethod