Skip to content

Commit

Permalink
feat(agent): Add trace for agent
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyinc committed Apr 10, 2024
1 parent 7d6dfd9 commit f01ad36
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 81 deletions.
279 changes: 212 additions & 67 deletions dbgpt/agent/core/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dbgpt._private.pydantic import Field
from dbgpt.core import LLMClient, ModelMessageRoleType
from dbgpt.util.error_types import LLMChatError
from dbgpt.util.tracer import SpanType, root_tracer
from dbgpt.util.utils import colored

from ..actions.action import Action, ActionOutput
Expand Down Expand Up @@ -199,13 +200,25 @@ async def send(
is_recovery: Optional[bool] = False,
) -> None:
"""Send a message to recipient agent."""
await recipient.receive(
message=message,
sender=self,
reviewer=reviewer,
request_reply=request_reply,
is_recovery=is_recovery,
)
with root_tracer.start_span(
"agent.send",
metadata={
"sender": self.get_name(),
"recipient": recipient.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"agent_message": message.to_dict(),
"request_reply": request_reply,
"is_recovery": is_recovery,
"conv_uid": self.not_null_agent_context.conv_id,
},
):
await recipient.receive(
message=message,
sender=self,
reviewer=reviewer,
request_reply=request_reply,
is_recovery=is_recovery,
)

async def receive(
self,
Expand All @@ -217,16 +230,30 @@ async def receive(
is_recovery: Optional[bool] = False,
) -> None:
"""Receive a message from another agent."""
await self._a_process_received_message(message, sender)
if request_reply is False or request_reply is None:
return
with root_tracer.start_span(
"agent.receive",
metadata={
"sender": sender.get_name(),
"recipient": self.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"agent_message": message.to_dict(),
"request_reply": request_reply,
"silent": silent,
"is_recovery": is_recovery,
"conv_uid": self.not_null_agent_context.conv_id,
"is_human": self.is_human,
},
):
await self._a_process_received_message(message, sender)
if request_reply is False or request_reply is None:
return

if not self.is_human:
reply = await self.generate_reply(
received_message=message, sender=sender, reviewer=reviewer
)
if reply is not None:
await self.send(reply, sender)
if not self.is_human:
reply = await self.generate_reply(
received_message=message, sender=sender, reviewer=reviewer
)
if reply is not None:
await self.send(reply, sender)

def prepare_act_param(self) -> Dict[str, Any]:
"""Prepare the parameters for the act method."""
Expand All @@ -244,13 +271,44 @@ async def generate_reply(
logger.info(
f"generate agent reply!sender={sender}, rely_messages_len={rely_messages}"
)
root_span = root_tracer.start_span(
"agent.generate_reply",
metadata={
"sender": sender.get_name(),
"recipient": self.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"received_message": received_message.to_dict(),
"conv_uid": self.not_null_agent_context.conv_id,
"rely_messages": [msg.to_dict() for msg in rely_messages]
if rely_messages
else None,
},
)

try:
reply_message: AgentMessage = self._init_reply_message(
received_message=received_message
)
await self._system_message_assembly(
received_message.content, reply_message.context
)
with root_tracer.start_span(
"agent.generate_reply._init_reply_message",
metadata={
"received_message": received_message.to_dict(),
},
) as span:
# initialize reply message
reply_message: AgentMessage = self._init_reply_message(
received_message=received_message
)
span.metadata["reply_message"] = reply_message.to_dict()

with root_tracer.start_span(
"agent.generate_reply._system_message_assembly",
metadata={
"reply_message": reply_message.to_dict(),
},
) as span:
# assemble system message
await self._system_message_assembly(
received_message.content, reply_message.context
)
span.metadata["assembled_system_messages"] = self.oai_system_message

fail_reason = None
current_retry_counter = 0
Expand All @@ -270,36 +328,73 @@ async def generate_reply(
retry_message, self, reviewer, request_reply=False
)

# 1.Think about how to do things
llm_reply, model_name = await self.thinking(
self._load_thinking_messages(
received_message, sender, rely_messages
)
)
reply_message.model_name = model_name
reply_message.content = llm_reply

# 2.Review whether what is being done is legal
approve, comments = await self.review(llm_reply, self)
reply_message.review_info = AgentReviewInfo(
approve=approve,
comments=comments,
thinking_messages = self._load_thinking_messages(
received_message, sender, rely_messages
)
with root_tracer.start_span(
"agent.generate_reply.thinking",
metadata={
"thinking_messages": [
msg.to_dict() for msg in thinking_messages
],
},
) as span:
# 1.Think about how to do things
llm_reply, model_name = await self.thinking(thinking_messages)
reply_message.model_name = model_name
reply_message.content = llm_reply
span.metadata["llm_reply"] = llm_reply
span.metadata["model_name"] = model_name

with root_tracer.start_span(
"agent.generate_reply.review",
metadata={"llm_reply": llm_reply, "censored": self.get_name()},
) as span:
# 2.Review whether what is being done is legal
approve, comments = await self.review(llm_reply, self)
reply_message.review_info = AgentReviewInfo(
approve=approve,
comments=comments,
)
span.metadata["approve"] = approve
span.metadata["comments"] = comments

# 3.Act based on the results of your thinking
act_extent_param = self.prepare_act_param()
act_out: Optional[ActionOutput] = await self.act(
message=llm_reply,
sender=sender,
reviewer=reviewer,
**act_extent_param,
)
if act_out:
reply_message.action_report = act_out.dict()

# 4.Reply information verification
check_pass, reason = await self.verify(reply_message, sender, reviewer)
is_success = check_pass
with root_tracer.start_span(
"agent.generate_reply.act",
metadata={
"llm_reply": llm_reply,
"sender": sender.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"act_extent_param": act_extent_param,
},
) as span:
# 3.Act based on the results of your thinking
act_out: Optional[ActionOutput] = await self.act(
message=llm_reply,
sender=sender,
reviewer=reviewer,
**act_extent_param,
)
if act_out:
reply_message.action_report = act_out.dict()
span.metadata["action_report"] = act_out.dict() if act_out else None

with root_tracer.start_span(
"agent.generate_reply.verify",
metadata={
"llm_reply": llm_reply,
"sender": sender.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
},
) as span:
# 4.Reply information verification
check_pass, reason = await self.verify(
reply_message, sender, reviewer
)
is_success = check_pass
span.metadata["check_pass"] = check_pass
span.metadata["reason"] = reason
# 5.Optimize wrong answers myself
if not check_pass:
current_retry_counter += 1
Expand All @@ -319,6 +414,9 @@ async def generate_reply(
err_message = AgentMessage(content=str(e))
err_message.success = False
return err_message
finally:
root_span.metadata["reply_message"] = reply_message.to_dict()
root_span.end()

async def thinking(
self, messages: List[AgentMessage], prompt: Optional[str] = None
Expand Down Expand Up @@ -378,7 +476,7 @@ async def act(
) -> Optional[ActionOutput]:
"""Perform actions."""
last_out: Optional[ActionOutput] = None
for action in self.actions:
for i, action in enumerate(self.actions):
# Select the resources required by acton
need_resource = None
if self.resources and len(self.resources) > 0:
Expand All @@ -390,12 +488,27 @@ async def act(
if not message:
raise ValueError("The message content is empty!")

last_out = await action.run(
ai_message=message,
resource=need_resource,
rely_action_out=last_out,
**kwargs,
)
with root_tracer.start_span(
"agent.act.run",
metadata={
"message": message,
"sender": sender.get_name() if sender else None,
"recipient": self.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"need_resource": need_resource.to_dict() if need_resource else None,
"rely_action_out": last_out.dict() if last_out else None,
"conv_uid": self.not_null_agent_context.conv_id,
"action_index": i,
"total_action": len(self.actions),
},
) as span:
last_out = await action.run(
ai_message=message,
resource=need_resource,
rely_action_out=last_out,
**kwargs,
)
span.metadata["action_out"] = last_out.dict() if last_out else None
return last_out

async def correctness_check(
Expand Down Expand Up @@ -446,12 +559,24 @@ async def initiate_chat(
reviewer (Agent): The reviewer agent.
message (str): The message to send.
"""
await self.send(
AgentMessage(content=message, current_goal=message),
recipient,
reviewer,
request_reply=True,
)
agent_message = AgentMessage(content=message, current_goal=message)
with root_tracer.start_span(
"agent.initiate_chat",
span_type=SpanType.AGENT,
metadata={
"sender": self.get_name(),
"recipient": recipient.get_name(),
"reviewer": reviewer.get_name() if reviewer else None,
"agent_message": agent_message.to_dict(),
"conv_uid": self.not_null_agent_context.conv_id,
},
):
await self.send(
agent_message,
recipient,
reviewer,
request_reply=True,
)

#######################################################################
# Private Function Begin
Expand Down Expand Up @@ -506,8 +631,15 @@ async def _a_append_message(
model_name=oai_message.get("model_name", None),
)

self.memory.message_memory.append(gpts_message)
return True
with root_tracer.start_span(
"agent.save_message_to_memory",
metadata={
"gpts_message": gpts_message.to_dict(),
"conv_uid": self.not_null_agent_context.conv_id,
},
):
self.memory.message_memory.append(gpts_message)
return True

def _print_received_message(self, message: AgentMessage, sender: Agent):
# print the message received
Expand Down Expand Up @@ -711,14 +843,27 @@ def _load_thinking_messages(

# Convert and tailor the information in collective memory into contextual
# memory available to the current Agent
current_goal_messages = self._convert_to_ai_message(
self.memory.message_memory.get_between_agents(

with root_tracer.start_span(
"agent._load_thinking_messages",
metadata={
"sender": sender.get_name(),
"recipient": self.get_name(),
"conv_uid": self.not_null_agent_context.conv_id,
"current_goal": current_goal,
},
) as span:
# Get historical information from the memory
memory_messages = self.memory.message_memory.get_between_agents(
self.not_null_agent_context.conv_id,
self.profile,
sender.get_profile(),
current_goal,
)
)
span.metadata["memory_messages"] = [
message.to_dict() for message in memory_messages
]
current_goal_messages = self._convert_to_ai_message(memory_messages)

# When there is no target and context, the current received message is used as
# the target problem
Expand Down
2 changes: 1 addition & 1 deletion dbgpt/app/dbgpt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ def run_webserver(param: WebServerParameters = None):
if not param:
param = _get_webserver_params()
initialize_tracer(
system_app,
os.path.join(LOGDIR, param.tracer_file),
system_app=system_app,
tracer_storage_cls=param.tracer_storage_cls,
)

Expand Down
Loading

0 comments on commit f01ad36

Please sign in to comment.