From a207640ff2015012e54e08b1dfc8c1c9ba46502d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=8E=E5=A4=A9?= <460342015@qq.com> Date: Thu, 14 Mar 2024 14:38:10 +0800 Subject: [PATCH] fix(agent): Fix agent loss message bug (#1283) --- dbgpt/agent/actions/plugin_action.py | 4 +- dbgpt/agent/agents/agent_new.py | 47 ++++++ dbgpt/agent/agents/base_agent_new.py | 137 +++++++++++------- dbgpt/agent/memory/gpts_memory.py | 91 ++++++++++-- dbgpt/agent/resource/resource_api.py | 7 +- dbgpt/serve/agent/agents/controller.py | 42 ++++-- .../resource_loader/plugin_hub_load_client.py | 11 +- .../serve/agent/team/layout/agent_operator.py | 12 +- .../agent/team/layout/team_awel_layout_new.py | 14 +- dbgpt/serve/agent/team/plan/team_auto_plan.py | 4 +- 10 files changed, 278 insertions(+), 91 deletions(-) diff --git a/dbgpt/agent/actions/plugin_action.py b/dbgpt/agent/actions/plugin_action.py index ab402e451..715d7c84c 100644 --- a/dbgpt/agent/actions/plugin_action.py +++ b/dbgpt/agent/actions/plugin_action.py @@ -83,10 +83,10 @@ async def a_run( if not resource_plugin_client: raise ValueError("No implementation of the use of plug-in resources!") response_success = True - status = Status.TODO.value + status = Status.RUNNING.value + tool_result = "" err_msg = None try: - status = Status.RUNNING.value tool_result = await resource_plugin_client.a_execute_command( param.tool_name, param.args, plugin_generator ) diff --git a/dbgpt/agent/agents/agent_new.py b/dbgpt/agent/agents/agent_new.py index bcacf385a..32259ef74 100644 --- a/dbgpt/agent/agents/agent_new.py +++ b/dbgpt/agent/agents/agent_new.py @@ -4,6 +4,12 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Tuple, Union +from dbgpt.agent.resource.resource_loader import ResourceLoader +from dbgpt.core import LLMClient +from dbgpt.util.annotations import PublicAPI + +from ..memory.gpts_memory import GptsMemory + class Agent(ABC): async def a_send( @@ -72,6 +78,8 @@ async def a_review( async def a_act( self, message: Optional[str], + sender: Optional[Agent] = None, + reviewer: Optional[Agent] = None, **kwargs, ) -> Union[str, Dict, None]: """ @@ -101,3 +109,42 @@ async def a_verify( Returns: """ + + +@dataclasses.dataclass +class AgentContext: + conv_id: str + gpts_app_name: str = None + language: str = None + max_chat_round: Optional[int] = 100 + max_retry_round: Optional[int] = 10 + max_new_tokens: Optional[int] = 1024 + temperature: Optional[float] = 0.5 + allow_format_str_template: Optional[bool] = False + + def to_dict(self) -> Dict[str, Any]: + return dataclasses.asdict(self) + + +@dataclasses.dataclass +@PublicAPI(stability="beta") +class AgentGenerateContext: + """A class to represent the input of a Agent.""" + + message: Optional[Dict] + sender: Agent + reviewer: Agent + silent: Optional[bool] = False + + rely_messages: List[Dict] = dataclasses.field(default_factory=list) + final: Optional[bool] = True + + memory: Optional[GptsMemory] = None + agent_context: Optional[AgentContext] = None + resource_loader: Optional[ResourceLoader] = None + llm_client: Optional[LLMClient] = None + + round_index: int = None + + def to_dict(self) -> Dict: + return dataclasses.asdict(self) diff --git a/dbgpt/agent/agents/base_agent_new.py b/dbgpt/agent/agents/base_agent_new.py index 2109c95e4..f8ac83934 100644 --- a/dbgpt/agent/agents/base_agent_new.py +++ b/dbgpt/agent/agents/base_agent_new.py @@ -8,8 +8,7 @@ from pydantic import BaseModel, Field from dbgpt.agent.actions.action import Action, ActionOutput -from dbgpt.agent.agents.agent import AgentContext -from dbgpt.agent.agents.agent_new import Agent +from dbgpt.agent.agents.agent_new import Agent, AgentContext from dbgpt.agent.agents.llm.llm import LLMConfig, LLMStrategyType from dbgpt.agent.agents.llm.llm_client import AIWrapper from dbgpt.agent.agents.role import Role @@ -31,7 +30,7 @@ class ConversableAgent(Role, Agent): llm_config: Optional[LLMConfig] = None memory: GptsMemory = Field(default_factory=GptsMemory) resource_loader: Optional[ResourceLoader] = None - max_retry_count: int = 10 + max_retry_count: int = 3 consecutive_auto_reply_counter: int = 0 llm_client: Optional[AIWrapper] = None oai_system_message: List[Dict] = Field(default_factory=list) @@ -178,54 +177,75 @@ async def a_generate_reply( logger.info( f"generate agent reply!sender={sender}, rely_messages_len={rely_messages}" ) + try: + reply_message = self._init_reply_message(recive_message=recive_message) + await self._system_message_assembly( + recive_message["content"], reply_message.get("context", None) + ) - reply_message = self._init_reply_message(recive_message=recive_message) - await self._system_message_assembly( - recive_message["content"], reply_message.get("context", None) - ) + fail_reason = None + current_retry_counter = 0 + is_sucess = True + while current_retry_counter < self.max_retry_count: + if current_retry_counter > 0: + retry_message = self._init_reply_message( + recive_message=recive_message + ) + retry_message["content"] = fail_reason + retry_message["current_goal"] = recive_message.get( + "current_goal", None + ) + # The current message is a self-optimized message that needs to be recorded. + # It is temporarily set to be initiated by the originating end to facilitate the organization of historical memory context. + await sender.a_send( + retry_message, self, reviewer, request_reply=False + ) - fail_reason = None - current_retry_counter = 0 - is_sucess = True - while current_retry_counter < self.max_retry_count: - if current_retry_counter > 0: - retry_message = self._init_reply_message(recive_message=recive_message) - retry_message["content"] = fail_reason - # The current message is a self-optimized message that needs to be recorded. - # It is temporarily set to be initiated by the originating end to facilitate the organization of historical memory context. - await sender.a_send(retry_message, self, reviewer, request_reply=False) - - # 1.Think about how to do things - llm_reply, model_name = await self.a_thinking( - self._load_thinking_messages(recive_message, sender, rely_messages) - ) - reply_message["model_name"] = model_name - reply_message["content"] = llm_reply - - # 2.Review whether what is being done is legal - approve, comments = await self.a_review(llm_reply, self) - reply_message["review_info"] = {"approve": approve, "comments": comments} - - # 3.Act based on the results of your thinking - act_extent_param = self.prepare_act_param() - act_out: ActionOutput = await self.a_act( - message=llm_reply, - **act_extent_param, - ) - reply_message["action_report"] = act_out.dict() - - # 4.Reply information verification - check_paas, reason = await self.a_verify(reply_message, sender, reviewer) - is_sucess = check_paas - # 5.Optimize wrong answers myself - if not check_paas: - current_retry_counter += 1 - # Send error messages and issue new problem-solving instructions - await self.a_send(reply_message, sender, reviewer, request_reply=False) - fail_reason = reason - else: - break - return is_sucess, reply_message + # 1.Think about how to do things + llm_reply, model_name = await self.a_thinking( + self._load_thinking_messages(recive_message, sender, rely_messages) + ) + reply_message["model_name"] = model_name + reply_message["content"] = llm_reply + + # 2.Review whether what is being done is legal + approve, comments = await self.a_review(llm_reply, self) + reply_message["review_info"] = { + "approve": approve, + "comments": comments, + } + + # 3.Act based on the results of your thinking + act_extent_param = self.prepare_act_param() + act_out: ActionOutput = await self.a_act( + message=llm_reply, + sender=sender, + reviewer=reviewer, + **act_extent_param, + ) + reply_message["action_report"] = act_out.dict() + + # 4.Reply information verification + check_paas, reason = await self.a_verify( + reply_message, sender, reviewer + ) + is_sucess = check_paas + # 5.Optimize wrong answers myself + if not check_paas: + current_retry_counter += 1 + # Send error messages and issue new problem-solving instructions + if current_retry_counter < self.max_retry_count: + await self.a_send( + reply_message, sender, reviewer, request_reply=False + ) + fail_reason = reason + else: + break + return is_sucess, reply_message + + except Exception as e: + logger.exception("Generate reply exception!") + return False, {"content": str(e)} async def a_thinking( self, messages: Optional[List[Dict]], prompt: Optional[str] = None @@ -265,7 +285,13 @@ async def a_review( ) -> Tuple[bool, Any]: return True, None - async def a_act(self, message: Optional[str], **kwargs) -> Optional[ActionOutput]: + async def a_act( + self, + message: Optional[str], + sender: Optional[ConversableAgent] = None, + reviewer: Optional[ConversableAgent] = None, + **kwargs, + ) -> Optional[ActionOutput]: last_out = None for action in self.actions: # Select the resources required by acton @@ -335,6 +361,7 @@ async def a_initiate_chat( ####################################################################### def _init_actions(self, actions: List[Action] = None): + self.actions = [] for idx, action in enumerate(actions): if not isinstance(action, Action): self.actions.append(action()) @@ -426,7 +453,9 @@ async def _system_message_assembly( for item in self.resources: resource_client = self.resource_loader.get_resesource_api(item.type) resource_prompt_list.append( - await resource_client.get_resource_prompt(item, qustion) + await resource_client.get_resource_prompt( + self.agent_context.conv_id, item, qustion + ) ) if context is None: context = {} @@ -525,7 +554,11 @@ def _convert_to_ai_message( content = item.content if item.action_report: action_out = ActionOutput.from_dict(json.loads(item.action_report)) - if action_out is not None and action_out.content is not None: + if ( + action_out is not None + and action_out.is_exe_success + and action_out.content is not None + ): content = action_out.content oai_messages.append( { diff --git a/dbgpt/agent/memory/gpts_memory.py b/dbgpt/agent/memory/gpts_memory.py index 94c9af19e..c66386271 100644 --- a/dbgpt/agent/memory/gpts_memory.py +++ b/dbgpt/agent/memory/gpts_memory.py @@ -1,15 +1,18 @@ from __future__ import annotations import json -from collections import defaultdict +from collections import OrderedDict, defaultdict from typing import Dict, List, Optional +from dbgpt.agent.actions.action import ActionOutput from dbgpt.util.json_utils import EnhancedJSONEncoder from dbgpt.vis.client import VisAgentMessages, VisAgentPlans, vis_client from .base import GptsMessage, GptsMessageMemory, GptsPlansMemory from .default_gpts_memory import DefaultGptsMessageMemory, DefaultGptsPlansMemory +NONE_GOAL_PREFIX: str = "none_goal_count_" + class GptsMemory: def __init__( @@ -32,6 +35,41 @@ def plans_memory(self): def message_memory(self): return self._message_memory + async def _message_group_vis_build(self, message_group): + if not message_group: + return "" + num: int = 0 + last_goal = next(reversed(message_group)) + last_goal_messages = message_group[last_goal] + + last_goal_message = last_goal_messages[-1] + vis_items = [] + + plan_temps = [] + for key, value in message_group.items(): + num = num + 1 + if key.startswith(NONE_GOAL_PREFIX): + vis_items.append(await self._messages_to_plan_vis(plan_temps)) + plan_temps = [] + num = 0 + vis_items.append(await self._messages_to_agents_vis(value)) + else: + num += 1 + plan_temps.append( + { + "name": key, + "num": num, + "status": "complete", + "agent": value[0].receiver if value else "", + "markdown": await self._messages_to_agents_vis(value), + } + ) + + if len(plan_temps) > 0: + vis_items.append(await self._messages_to_plan_vis(plan_temps)) + vis_items.append(await self._messages_to_agents_vis([last_goal_message])) + return "\n".join(vis_items) + async def _plan_vis_build(self, plan_group: dict[str, list]): num: int = 0 plan_items = [] @@ -48,6 +86,37 @@ async def _plan_vis_build(self, plan_group: dict[str, list]): ) return await self._messages_to_plan_vis(plan_items) + async def one_chat_competions_v2(self, conv_id: str): + messages = self.message_memory.get_by_conv_id(conv_id=conv_id) + temp_group = OrderedDict() + none_goal_count = 1 + count: int = 0 + for message in messages: + count = count + 1 + if count == 1: + continue + current_gogal = message.current_goal + + last_goal = next(reversed(temp_group)) if temp_group else None + if last_goal: + last_goal_messages = temp_group[last_goal] + if current_gogal: + if current_gogal == last_goal: + last_goal_messages.append(message) + else: + temp_group[current_gogal] = [message] + else: + temp_group[f"{NONE_GOAL_PREFIX}{none_goal_count}"] = [message] + none_goal_count += 1 + else: + if current_gogal: + temp_group[current_gogal] = [message] + else: + temp_group[f"{NONE_GOAL_PREFIX}{none_goal_count}"] = [message] + none_goal_count += 1 + + return await self._message_group_vis_build(temp_group) + async def one_chat_competions(self, conv_id: str): messages = self.message_memory.get_by_conv_id(conv_id=conv_id) temp_group = defaultdict(list) @@ -76,12 +145,14 @@ async def one_chat_competions(self, conv_id: str): vis_items.append(await self._plan_vis_build(temp_group)) temp_group.clear() if len(temp_messages) > 0: - vis_items.append(await self._messages_to_agents_vis(temp_messages)) + vis_items.append(await self._messages_to_agents_vis(temp_messages, True)) temp_messages.clear() return "\n".join(vis_items) - async def _messages_to_agents_vis(self, messages: List[GptsMessage]): + async def _messages_to_agents_vis( + self, messages: List[GptsMessage], is_last_message: bool = False + ): if messages is None or len(messages) <= 0: return "" messages_view = [] @@ -89,10 +160,11 @@ async def _messages_to_agents_vis(self, messages: List[GptsMessage]): action_report_str = message.action_report view_info = message.content if action_report_str and len(action_report_str) > 0: - action_report = json.loads(action_report_str) - if action_report: - view = action_report.get("view", None) - view_info = view if view else action_report.get("content", "") + action_out = ActionOutput.from_dict(json.loads(action_report_str)) + if action_out is not None: + if action_out.is_exe_success or is_last_message: + view = action_out.view + view_info = view if view else action_out.content messages_view.append( { @@ -102,9 +174,8 @@ async def _messages_to_agents_vis(self, messages: List[GptsMessage]): "markdown": view_info, } ) - return await vis_client.get(VisAgentMessages.vis_tag()).display( - content=messages_view - ) + vis_compent = vis_client.get(VisAgentMessages.vis_tag()) + return await vis_compent.display(content=messages_view) async def _messages_to_plan_vis(self, messages: List[Dict]): if messages is None or len(messages) <= 0: diff --git a/dbgpt/agent/resource/resource_api.py b/dbgpt/agent/resource/resource_api.py index 310bc0680..4486845b0 100644 --- a/dbgpt/agent/resource/resource_api.py +++ b/dbgpt/agent/resource/resource_api.py @@ -14,7 +14,10 @@ class ResourceType(Enum): Knowledge = "knowledge" Internet = "internet" Plugin = "plugin" - File = "file" + TextFile = "text_file" + ExcelFile = "excel_file" + ImageFile = "image_file" + AwelFlow = "awel_flow" class AgentResource(BaseModel): @@ -80,7 +83,7 @@ def get_data_type(self, resource: AgentResource) -> str: return "" async def get_resource_prompt( - self, resource: AgentResource, question: Optional[str] = None + self, conv_uid, resource: AgentResource, question: Optional[str] = None ) -> str: return resource.resource_prompt_template().format( data_type=self.get_data_type(resource), diff --git a/dbgpt/serve/agent/agents/controller.py b/dbgpt/serve/agent/agents/controller.py index 7f15d869b..bad2f4cb0 100644 --- a/dbgpt/serve/agent/agents/controller.py +++ b/dbgpt/serve/agent/agents/controller.py @@ -10,7 +10,7 @@ from fastapi.responses import StreamingResponse from dbgpt._private.config import Config -from dbgpt.agent.agents.agent import Agent, AgentContext +from dbgpt.agent.agents.agent_new import Agent, AgentContext from dbgpt.agent.agents.agents_manage import agent_manage from dbgpt.agent.agents.base_agent_new import ConversableAgent from dbgpt.agent.agents.llm.llm import LLMConfig, LLMStrategyType @@ -121,7 +121,7 @@ async def agent_chat( ) ) - asyncio.create_task( + task = asyncio.create_task( multi_agents.agent_team_chat_new( user_query, agent_conv_id, gpt_app, is_retry_chat ) @@ -129,17 +129,19 @@ async def agent_chat( async for chunk in multi_agents.chat_messages(agent_conv_id): if chunk: - logger.info(chunk) try: chunk = json.dumps( {"vis": chunk}, default=serialize, ensure_ascii=False ) - yield f"data: {chunk}\n\n" + if chunk is None or len(chunk) <= 0: + continue + resp = f"data:{chunk}\n\n" + yield task, resp except Exception as e: logger.exception(f"get messages {gpts_name} Exception!" + str(e)) yield f"data: {str(e)}\n\n" - yield f'data:{json.dumps({"vis": "[DONE]"}, default=serialize, ensure_ascii=False)} \n\n' + yield task, f'data:{json.dumps({"vis": "[DONE]"}, default=serialize, ensure_ascii=False)} \n\n' async def app_agent_chat( self, @@ -164,19 +166,30 @@ async def app_agent_chat( current_message.save_to_storage() current_message.start_new_round() current_message.add_user_message(user_query) + agent_conv_id = conv_uid + "_" + str(current_message.chat_order) + agent_task = None try: agent_conv_id = conv_uid + "_" + str(current_message.chat_order) - async for chunk in multi_agents.agent_chat( + async for task, chunk in multi_agents.agent_chat( agent_conv_id, gpts_name, user_query, user_code, sys_code ): + agent_task = task yield chunk - final_message = await self.stable_message(agent_conv_id) + except asyncio.CancelledError: + # Client disconnects + print("Client disconnected") + if agent_task: + logger.info(f"Chat to App {gpts_name}:{agent_conv_id} Cancel!") + agent_task.cancel() except Exception as e: logger.exception(f"Chat to App {gpts_name} Failed!" + str(e)) + raise finally: - logger.info(f"save agent chat info!{conv_uid},{agent_conv_id}") - current_message.add_view_message(final_message) + logger.info(f"save agent chat info!{conv_uid}") + final_message = await self.stable_message(agent_conv_id) + if final_message: + current_message.add_view_message(final_message) current_message.end_current_round() current_message.save_to_storage() @@ -288,7 +301,7 @@ async def chat_messages( ] else False ) - message = await self.memory.one_chat_competions(conv_id) + message = await self.memory.one_chat_competions_v2(conv_id) yield message if is_complete: @@ -308,11 +321,12 @@ async def stable_message( else False ) if is_complete: - return await self.memory.one_chat_competions(conv_id) + return await self.memory.one_chat_competions_v2(conv_id) else: - raise ValueError( - "The conversation has not been completed yet, so we cannot directly obtain information." - ) + pass + # raise ValueError( + # "The conversation has not been completed yet, so we cannot directly obtain information." + # ) else: raise ValueError("No conversation record found!") diff --git a/dbgpt/serve/agent/resource_loader/plugin_hub_load_client.py b/dbgpt/serve/agent/resource_loader/plugin_hub_load_client.py index d7e5b4588..c2923c165 100644 --- a/dbgpt/serve/agent/resource_loader/plugin_hub_load_client.py +++ b/dbgpt/serve/agent/resource_loader/plugin_hub_load_client.py @@ -27,14 +27,15 @@ async def a_load_plugin( self, value: str, plugin_generator: Optional[PluginPromptGenerator] = None ) -> PluginPromptGenerator: logger.info(f"PluginHubLoadClient load plugin:{value}") - plugins_prompt_generator = PluginPromptGenerator() - plugins_prompt_generator.command_registry = CFG.command_registry + if plugin_generator is None: + plugin_generator = PluginPromptGenerator() + plugin_generator.command_registry = CFG.command_registry agent_module = CFG.SYSTEM_APP.get_component( ComponentType.PLUGIN_HUB, ModulePlugin ) - plugins_prompt_generator = agent_module.load_select_plugin( - plugins_prompt_generator, json.dumps(value) + plugin_generator = agent_module.load_select_plugin( + plugin_generator, json.dumps(value) ) - return plugins_prompt_generator + return plugin_generator diff --git a/dbgpt/serve/agent/team/layout/agent_operator.py b/dbgpt/serve/agent/team/layout/agent_operator.py index 86f80c81a..482f78d3a 100644 --- a/dbgpt/serve/agent/team/layout/agent_operator.py +++ b/dbgpt/serve/agent/team/layout/agent_operator.py @@ -1,7 +1,7 @@ from abc import ABC from typing import Dict, List, Optional -from dbgpt.agent.agents.agent import Agent, AgentGenerateContext +from dbgpt.agent.agents.agent_new import Agent, AgentGenerateContext from dbgpt.agent.agents.agents_manage import agent_manage from dbgpt.agent.agents.base_agent_new import ConversableAgent from dbgpt.agent.agents.llm.llm import LLMConfig @@ -191,6 +191,7 @@ async def map( agent_context=input_value.agent_context, resource_loader=input_value.resource_loader, llm_client=input_value.llm_client, + round_index=agent.consecutive_auto_reply_counter, ) async def get_agent( @@ -208,11 +209,19 @@ async def get_agent( llm_config = LLMConfig(llm_client=input_value.llm_client) else: llm_config = LLMConfig(llm_client=self.llm_client) + else: + if not llm_config.llm_client: + if input_value.llm_client: + llm_config.llm_client = input_value.llm_client + else: + llm_config.llm_client = self.llm_client + kwargs = {} if self.awel_agent.role_name: kwargs["name"] = self.awel_agent.role_name if self.awel_agent.fixed_subgoal: kwargs["fixed_subgoal"] = self.awel_agent.fixed_subgoal + agent = ( await agent_cls(**kwargs) .bind(input_value.memory) @@ -222,6 +231,7 @@ async def get_agent( .bind(input_value.resource_loader) .build() ) + return agent diff --git a/dbgpt/serve/agent/team/layout/team_awel_layout_new.py b/dbgpt/serve/agent/team/layout/team_awel_layout_new.py index 7c5013a29..d8533af3f 100644 --- a/dbgpt/serve/agent/team/layout/team_awel_layout_new.py +++ b/dbgpt/serve/agent/team/layout/team_awel_layout_new.py @@ -1,12 +1,12 @@ import json import logging -from typing import Any, List, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field, validator from dbgpt._private.config import Config from dbgpt.agent.actions.action import ActionOutput, T -from dbgpt.agent.agents.agent import Agent, AgentContext, AgentGenerateContext +from dbgpt.agent.agents.agent_new import Agent, AgentContext, AgentGenerateContext from dbgpt.agent.agents.base_agent_new import ConversableAgent from dbgpt.agent.agents.base_team import ManagerAgent from dbgpt.core.awel import DAG @@ -35,6 +35,9 @@ def check_dag(cls, value): assert value is not None and value != "", "dag must not be empty" return value + async def _a_process_received_message(self, message: Optional[Dict], sender: Agent): + pass + async def a_act( self, message: Optional[str], @@ -60,7 +63,7 @@ async def a_act( "content": message, "current_goal": message, }, - sender=self, + sender=sender, reviewer=reviewer, memory=self.memory, agent_context=self.agent_context, @@ -73,8 +76,11 @@ async def a_act( last_message = final_generate_context.rely_messages[-1] last_agent = await last_node.get_agent(final_generate_context) + last_agent.consecutive_auto_reply_counter = ( + final_generate_context.round_index + ) await last_agent.a_send( - last_message, self, start_message_context.reviewer, False + last_message, sender, start_message_context.reviewer, False ) return ActionOutput( diff --git a/dbgpt/serve/agent/team/plan/team_auto_plan.py b/dbgpt/serve/agent/team/plan/team_auto_plan.py index a2808bcd6..455193200 100644 --- a/dbgpt/serve/agent/team/plan/team_auto_plan.py +++ b/dbgpt/serve/agent/team/plan/team_auto_plan.py @@ -35,7 +35,9 @@ async def a_process_rely_message( if now_plan.rely and len(now_plan.rely) > 0: rely_tasks_list = now_plan.rely.split(",") - rely_tasks = self.memory.plans_memory.get_by_conv_id_and_num(conv_id, []) + rely_tasks = self.memory.plans_memory.get_by_conv_id_and_num( + conv_id, rely_tasks_list + ) if rely_tasks: rely_prompt = "Read the result data of the dependent steps in the above historical message to complete the current goal:" for rely_task in rely_tasks: