You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am attempting to build a dynamic graph that will build a proper graph based on the plan that is passed in to the method. These plans are in following format:
PerplexicaAgent and GmailAgent are seperately hosted entities.
[
{
'tool_name': 'PerplexicaAgent',
'prompt': 'Conduct detailed research on otters, including their habitat, behavior, diet, and conservation status.'
},
{
'tool_name': 'GmailAgent',
'prompt': 'Send an email to [email protected] with a summary of the research on otters, including their habitat, behavior, diet, and conservation status.'
}
]
The plan is then passed into the GraphBuilder:
import base64
import operator
from typing import List, Dict, Any
from functools import partial
from langgraph.graph import StateGraph, START, END
from langgraph.graph.state import CompiledStateGraph
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import BaseMessage, AIMessage
from langchain.tools import BaseTool
from datastorage.models import fetch_registry, get_llm_from_db, get_config_from_db, get_user_config
from langchain.chat_models import init_chat_model
from tools.extrenal_api_agent import ExternalAPIAgent
from tools.langserve_api_agent import LangserveAPIAgent
from typing import *
# Initialize the LLM
llm_config = get_llm_from_db("azure_openai_gpt4")
filtered_config = {k: v for k, v in llm_config.items() if v is not None}
llm = init_chat_model(**filtered_config)
class GraphState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
sender: str
class Agent:
def __init__(self, name: str, prompt: str, llm: str, tools: List[BaseTool]):
self.name = name
self.prompt = prompt
self.tools = tools
prompt_template = ChatPromptTemplate.from_messages([
("system", prompt),
MessagesPlaceholder(variable_name="messages"),
])
self.agent = prompt_template |llm.bind_tools(tools)
class GraphBuilder:
def __init__(self, plan: Dict[str, Any]):
self.plan = plan
self.agents: List[Agent] = []
self.graph: CompiledStateGraph | None = None
self.agent_registry = fetch_registry()
def _create_agent_node(self, agent: Agent):
return partial(self._agent_node, agent=agent.agent, name=agent.name)
@staticmethod
def _agent_node(state: Dict[str, Any], agent, name: str):
try:
result = agent.invoke(state)
# logger.debug(f"Agent {name} returned: {result}")
return {
"messages": [
AIMessage(
content=result["messages"][-1].content,
name=name
)
]
}
except Exception as e:
# logger.error(f"Error in agent_node for {name}: {e}")
raise
def _add_nodes(self, state_graph: StateGraph):
for agent in self.agents:
state_graph.add_node(agent.name, self._create_agent_node(agent))
def _add_edges(self, state_graph: StateGraph):
state_graph.add_edge(START, self.agents[0]) # Start with the first agent
for i in range(len(self.agents) - 1):
state_graph.add_edge(self.agents[i], self.agents[i + 1])
state_graph.add_edge(self.agents[-1], END) # End with the last agent
def build(self):
# Parse JSON and create agents
state_graph = StateGraph(GraphState)
plan_list = self.plan['plans']
for task in plan_list:
agent_name = task["tool_name"]
self.agents.append(agent_name)
# Determine if the agent is in the registry
agent_info = get_agent_info(agent_name)
if agent_info:
# Agent is in the registry
agent_api_url = agent_info.get("agent_api")
agent_description = agent_info.get("agent_description")
is_langserve = agent_info.get("langserve", False)
if is_langserve:
# Create LangserveAPIAgent
# logger.debug(f"Creating LangserveAPIAgent for {agent_name}")
langserve_agent = LangserveAPIAgent(
api_url=agent_api_url,
agent_name=agent_name
)
agent_node = partial(
self._agent_node, agent=langserve_agent, name=agent_name
)
state_graph.add_node(agent_name, agent_node)
elif agent_api_url:
# Create ExternalAPIAgent
# logger.debug(f"Creating ExternalAPIAgent for {agent_name}")
external_agent = ExternalAPIAgent(
api_url=agent_api_url,
agent_name=agent_name
)
agent_node = partial(
self._agent_node, agent=external_agent, name=agent_name
)
state_graph.add_node(agent_name, agent_node)
# Add edges to the graph
self._add_edges(state_graph)
self.graph = state_graph.compile()
self.save_image("mermaid.png")
return self.graph
def save_image(self, filepath, xray=True, mermaid=True):
if not self.graph:
raise ValueError("Graph not created. Call build() first.")
# Get the image data
img_data = self.graph.get_graph(xray=xray)
img_data = img_data.draw_mermaid_png() if mermaid else img_data.draw_png()
# Write the image data to the specified file path
with open(filepath, "wb") as image_file:
image_file.write(img_data)
# def get_stream(self, initial_state: Dict[str, Any], user: User):
# if not self.graph:
# raise ValueError("Graph not created. Call build() first.")
# return self.graph.stream(initial_state, config={"recursion_limit": 150, **get_llm_config(user)})
# Initialize the GraphBuilder with the plan
# Create the graph
def build_sample(json_plan):
graph_builder = GraphBuilder(plan=json_plan)
compiled_graph = graph_builder.build()
# Visualize the graph
# graph_image_base64 = graph_builder.save_image("mermaid.png")
# print(graph_image_base64) # Base64-encoded graph image
return compiled_graph
def get_agent_info(agent_name: str):
# Fetch agent info from the registry
agent_registry = fetch_registry()
# agent_registry is assumed to be a list of agent info dictionaries
for agent_info in agent_registry:
if agent_info["agent_name"] == agent_name:
return agent_info
return None
The problem that i am running into is that when a users query involves contacting an agent twice, for example:
Fetch my most recent email (Gmail), do research on that topic (perplexica), and respond to the email (Gmail)
i recieve the following error: raise ValueError(f"Node{node}already present.")
I am trying to find a way around this, i have considered creating a subgraph containing only the node, dynamically setting edges to allow for a return to another agent, as well as ditching langgraph and creating my own agentexecutor to acquire these responses.
Allowing this application to contact an agent more than once in a graph is a crucial part to my use case and would love any help or input.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I am attempting to build a dynamic graph that will build a proper graph based on the plan that is passed in to the method. These plans are in following format:
PerplexicaAgent and GmailAgent are seperately hosted entities.
The plan is then passed into the GraphBuilder:
The problem that i am running into is that when a users query involves contacting an agent twice, for example:
Fetch my most recent email (Gmail), do research on that topic (perplexica), and respond to the email (Gmail)
i recieve the following error:
raise ValueError(f"Node
{node}already present.")
I am trying to find a way around this, i have considered creating a subgraph containing only the node, dynamically setting edges to allow for a return to another agent, as well as ditching langgraph and creating my own agentexecutor to acquire these responses.
Allowing this application to contact an agent more than once in a graph is a crucial part to my use case and would love any help or input.
Beta Was this translation helpful? Give feedback.
All reactions