Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multi-agent adapter to integrations #255

Merged
merged 16 commits into from
Mar 11, 2024
46 changes: 46 additions & 0 deletions integrations/multi-agent-adapter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Multi-Agent adapter system

This Multi-Agent adapter architecture can be useful when you have many agents and you would like to send messages to all of them, then would like to wait and collect all the responses from all of them, and send back a final result/options at once to DeltaV.

### Step 1:
Create a Blank Agent on Agentverse:
https://fetch.ai/docs/guides/agentverse/creating-a-hosted-agent
Click on the newly created blank agent and paste content of the `src/agents/ai_agent.py` into the editor.
The name of the agent can be for example `AI Agent`.

### Step 2:
Run your AI Agent by clicking on `Run`:
https://fetch.ai/docs/guides/agentverse/creating-a-hosted-agent#create-your-first-hosted-agent-on-the-agentverse

### Step 3 (optional):
You can repeat step 1 and 2 to create many AI Agents.

### Step 4:
Create another Blank Agent on Agentverse:
https://fetch.ai/docs/guides/agentverse/creating-a-hosted-agent
Click on the newly created blank agent and paste content of the `src/agents/adapter.py` into the editor.
The name of the agent can be for example `AI Agents Adapter`.

### Step 5:
On line 14 add your AI Agent address(es):

```python
ctx.storage.set("ai_agent_addresses", "{YOUR_AI_AGENT_ADDRESS(ES) AS A LIST OF STRINGS}")
```

### Step 6:
Run your AI Agents adapter by clicking on `Run`:
https://fetch.ai/docs/guides/agentverse/creating-a-hosted-agent#create-your-first-hosted-agent-on-the-agentverse

### Step 7:
Register your newly created AI Agents Adapter as a Service on Agentverse:
https://fetch.ai/docs/guides/agentverse/registering-agent-services
Name of the service can be for example: `AI Agents adapter`
Description: `AI Agents adapter that sends prompt to multiple AI Agents and collects responses from them.`
`prompt` field description: `The prompt the user wants the AI Agents to generate response for. You must ask the user what the prompt is, don't pre fill it.`

### Step 8:
Go to DeltaV (https://fetch.ai/docs/guides/deltav/deltav-chat-interface)
and specify an objective similar to this: `Generate text with multiple AI Agents`
After you select the `AI Agents adapter` option you will be asked for the prompt.
After confirming the context you should be able to see the responses generated by all your AI Agents!
6 changes: 6 additions & 0 deletions integrations/multi-agent-adapter/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"title": "Multi-Agent adapter",
"description": "Multi-Agent adapter example can be useful when you have many agents and you would like to collect responses from all of them at once using DeltaV.",
"categories": ["Multi-Agent System", "AI", "DeltaV", "Text Generation"],
"deltav": true
}
92 changes: 92 additions & 0 deletions integrations/multi-agent-adapter/src/agents/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from ai_engine import UAgentResponse, UAgentResponseType


class AIRequest(Model):
prompt: str


class AIResponse(Model):
response: str


adapter_protocol = Protocol(f"AI Agent Adapter protocol")


@agent.on_event("startup")
async def start_ai_adapter(ctx: Context):
# TODO: SET YOUR AI AGENT ADDRESSES HERE!
ctx.storage.set("ai_agent_addresses", [])
ctx.storage.set("pending_ai_agents_responses", {})


@adapter_protocol.on_message(model=AIRequest, replies=UAgentResponse)
async def send_prompt_to_ai_agents(ctx: Context, sender: str, msg: AIRequest):
# save sender (=DeltaV) address so that we can send back response later
ctx.storage.set("deltav-sender-address", sender)
ai_agent_addresses = ctx.storage.get("ai_agent_addresses")
ctx.logger.info(
f"AI Agent addresses which we are sending request to: {ai_agent_addresses}"
)
for address in ai_agent_addresses:
ctx.logger.info(f"Forwarding prompt {msg.prompt} to AI Agent {address}")
await ctx.send(address, msg)


@adapter_protocol.on_message(model=AIResponse)
async def process_response_from_ai_agent(ctx: Context, sender: str, msg: AIResponse):
ctx.logger.info(f"Response from AI agent: {msg.response}")

# get all AI Agent responses from storage that belong to those DeltaV sessions
# for which we have already got some of the responses from our AI Agents but not all of them (= pending DeltaV sessions)
pending_ai_agents_responses = ctx.storage.get("pending_ai_agents_responses")
session_str = str(ctx.session)
if session_str in pending_ai_agents_responses:
# get storage entry for current DeltaV session from agent storage
# because another AI agent has already sent back its response to this Adapter via this message handler
# so an entry for this DeltaV session has already been created before in agent storage
ai_agents_responses_session = pending_ai_agents_responses[session_str]
else:
# Adapter hasn't got any messages from any of the AI Agents for this DeltaV session via this message handler so far
# so let's create a new entry for this session
ai_agents_responses_session = {}
# add response of AI Agent to session dictionary
ai_agents_responses_session[sender] = msg.response
ctx.logger.info(
f"""All AI Agent responses that belong to those DeltaV sessions
for which we have got some of the responses but not all of them: {pending_ai_agents_responses}"""
)

# number of responses for current DeltaV session is equal to the number of AI Agents
# it means we have received the response from all AI Agents
if len(ai_agents_responses_session) == len(ctx.storage.get("ai_agent_addresses")):
ctx.logger.info(
f"Sending response back to DeltaV for session {session_str} since we have received the response from all AI Agents"
)

# concatenate all AI responses into a final response string
final_ai_responses = ", ".join(
[resp for resp in ai_agents_responses_session.values()]
)
# send final response back to DeltaV
await ctx.send(
ctx.storage.get("deltav-sender-address"),
UAgentResponse(
message=f"AI Agents' responses: {final_ai_responses}",
type=UAgentResponseType.FINAL,
),
)

# we don't need to store responses for this session anymore
# so we can remove those AI responses from agent storage that were collected for current DeltaV session
pending_ai_agents_responses.pop(session_str)
# save updated AI responses data (without current session responses) in agent storage
ctx.storage.set("pending_ai_agents_responses", pending_ai_agents_responses)
return

# update all AI responses data with the one specific for this session
pending_ai_agents_responses[session_str] = ai_agents_responses_session
# save updated AI responses data in agent storage
ctx.storage.set("pending_ai_agents_responses", pending_ai_agents_responses)


agent.include(adapter_protocol)
16 changes: 16 additions & 0 deletions integrations/multi-agent-adapter/src/agents/ai_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class AIRequest(Model):
prompt: str


class AIResponse(Model):
response: str


@agent.on_message(model=AIRequest, replies=AIResponse)
async def respond_to_adapter(ctx: Context, sender: str, msg: AIRequest):
ctx.logger.info(f"Message from AI Adapter: {msg.prompt}")

# TODO: implement AI Agent logic here

# TODO: send back the actual AI response to the Adapter agent
await ctx.send(sender, AIResponse(response="AI response"))
Loading