Skip to content

[WIP] Initial A2A Integration #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions a2a_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from typing import Any
from uuid import uuid4

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import (
AgentCard,
MessageSendParams,
SendMessageRequest,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PUBLIC_AGENT_CARD_PATH = "/.well-known/agent.json"
BASE_URL = "http://localhost:9000"


async def main() -> None:
async with httpx.AsyncClient() as httpx_client:
# Initialize A2ACardResolver
resolver = A2ACardResolver(
httpx_client=httpx_client,
base_url=BASE_URL,
)

# Fetch Public Agent Card and Initialize Client
agent_card: AgentCard | None = None

try:
logger.info("Attempting to fetch public agent card from: {} {}", BASE_URL, PUBLIC_AGENT_CARD_PATH)
agent_card = await resolver.get_agent_card() # Fetches from default public path
logger.info("Successfully fetched public agent card:")
logger.info(agent_card.model_dump_json(indent=2, exclude_none=True))
except Exception as e:
logger.exception("Critical error fetching public agent card")
raise RuntimeError("Failed to fetch the public agent card. Cannot continue.") from e

client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)
logger.info("A2AClient initialized.")

send_message_payload: dict[str, Any] = {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "how much is 10 USD in INR?"}],
"messageId": uuid4().hex,
},
}
request = SendMessageRequest(id=str(uuid4()), params=MessageSendParams(**send_message_payload))

response = await client.send_message(request)
print(response.model_dump(mode="json", exclude_none=True))


if __name__ == "__main__":
import asyncio

asyncio.run(main())
19 changes: 19 additions & 0 deletions a2a_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
import sys

from strands import Agent
from strands.multiagent.a2a import A2AAgent

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
force=True,
)

# Log that we're starting
logging.info("Starting A2A server with root logger")

strands_agent = Agent(model="us.anthropic.claude-3-haiku-20240307-v1:0", callback_handler=None)
strands_a2a_agent = A2AAgent(agent=strands_agent, name="Hello World Agent", description="Just a hello world agent")
strands_a2a_agent.serve()
28 changes: 21 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ dependencies = [
"pydantic>=2.0.0,<3.0.0",
"typing-extensions>=4.13.2,<5.0.0",
"watchdog>=6.0.0,<7.0.0",
"opentelemetry-api>=1.30.0,<2.0.0",
"opentelemetry-sdk>=1.30.0,<2.0.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0",
"opentelemetry-api>=1.33.0,<2.0.0",
"opentelemetry-sdk>=1.33.0,<2.0.0",
]

[project.urls]
Expand Down Expand Up @@ -78,13 +77,22 @@ ollama = [
openai = [
"openai>=1.68.0,<2.0.0",
]
otel-export = [
"opentelemetry-exporter-otlp-proto-http>=1.33.0,<2.0.0",
]
a2a = [
"a2a-sdk>=0.2.6",
"uvicorn>=0.34.2",
"httpx>=0.28.1",
"fastapi>=0.115.12",
]

[tool.hatch.version]
# Tells Hatch to use your version control system (git) to determine the version.
source = "vcs"

[tool.hatch.envs.hatch-static-analysis]
features = ["anthropic", "litellm", "llamaapi", "ollama", "openai"]
features = ["anthropic", "litellm", "llamaapi", "ollama", "openai", "otel-export", "a2a"]
dependencies = [
"mypy>=1.15.0,<2.0.0",
"ruff>=0.11.6,<0.12.0",
Expand All @@ -107,7 +115,7 @@ lint-fix = [
]

[tool.hatch.envs.hatch-test]
features = ["anthropic", "litellm", "llamaapi", "ollama", "openai"]
features = ["anthropic", "litellm", "llamaapi", "ollama", "openai", "otel-export"]
extra-dependencies = [
"moto>=5.1.0,<6.0.0",
"pytest>=8.0.0,<9.0.0",
Expand All @@ -123,9 +131,11 @@ extra-args = [

[tool.hatch.envs.dev]
dev-mode = true
features = ["dev", "docs", "anthropic", "litellm", "llamaapi", "ollama"]

features = ["dev", "docs", "anthropic", "litellm", "llamaapi", "ollama", "otel-export"]

[tool.hatch.envs.a2a]
dev-mode = true
features = ["dev", "a2a"]

[[tool.hatch.envs.hatch-test.matrix]]
python = ["3.13", "3.12", "3.11", "3.10"]
Expand Down Expand Up @@ -191,6 +201,9 @@ ignore_missing_imports = true
line-length = 120
include = ["examples/**/*.py", "src/**/*.py", "tests/**/*.py", "tests-integ/**/*.py"]

[tool.ruff.format]
docstring-code-format = true

[tool.ruff.lint]
select = [
"B", # flake8-bugbear
Expand All @@ -213,6 +226,7 @@ testpaths = [
"tests"
]
asyncio_default_fixture_loop_scope = "function"
norecursedirs = ["multiagent/a2a"]

[tool.coverage.run]
branch = true
Expand Down
13 changes: 13 additions & 0 deletions src/strands/multiagent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Multiagent capabilities for Strands Agents.

This module provides support for multiagent systems, including agent-to-agent (A2A)
communication protocols and coordination mechanisms.

Submodules:
a2a: Implementation of the Agent-to-Agent (A2A) protocol, which enables
standardized communication between agents.
"""

from . import a2a

__all__ = ["a2a"]
14 changes: 14 additions & 0 deletions src/strands/multiagent/a2a/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Agent-to-Agent (A2A) communication protocol implementation for Strands Agents.

This module provides classes and utilities for enabling Strands Agents to communicate
with other agents using the Agent-to-Agent (A2A) protocol.

Docs: https://google-a2a.github.io/A2A/latest/

Classes:
A2AAgent: A wrapper that adapts a Strands Agent to be A2A-compatible.
"""

from .agent import A2AAgent

__all__ = ["A2AAgent"]
139 changes: 139 additions & 0 deletions src/strands/multiagent/a2a/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""A2A-compatible wrapper for Strands Agent.

This module provides the A2AAgent class, which adapts a Strands Agent to the A2A protocol,
allowing it to be used in A2A-compatible systems.
"""

import logging
from typing import Any, Literal

import uvicorn
from a2a.server.apps import A2AFastAPIApplication, A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from fastapi import FastAPI
from starlette.applications import Starlette

from ...agent.agent import Agent as SAAgent
from .executor import StrandsA2AExecutor

log = logging.getLogger(__name__)


class A2AAgent:
"""A2A-compatible wrapper for Strands Agent."""

def __init__(
self,
agent: SAAgent,
*,
name: str,
description: str,
host: str = "localhost",
port: int = 9000,
version: str = "0.0.1",
):
"""Initialize an A2A-compatible agent from a Strands agent.

Args:
agent: The Strands Agent to wrap with A2A compatibility.
name: The name of the agent, used in the AgentCard.
description: A description of the agent's capabilities, used in the AgentCard.
host: The hostname or IP address to bind the A2A server to. Defaults to "localhost".
port: The port to bind the A2A server to. Defaults to 9000.
version: The version of the agent. Defaults to "0.0.1".
"""
self.name = name
self.description = description
self.host = host
self.port = port
self.http_url = f"http://{self.host}:{self.port}/"
self.version = version
self.strands_agent = agent
self.capabilities = AgentCapabilities()
self.request_handler = DefaultRequestHandler(
agent_executor=StrandsA2AExecutor(self.strands_agent),
task_store=InMemoryTaskStore(),
)

@property
def public_agent_card(self) -> AgentCard:
"""Get the public AgentCard for this agent.

The AgentCard contains metadata about the agent, including its name,
description, URL, version, skills, and capabilities. This information
is used by other agents and systems to discover and interact with this agent.

Returns:
AgentCard: The public agent card containing metadata about this agent.
"""
return AgentCard(
name=self.name,
description=self.description,
url=self.http_url,
version=self.version,
skills=self.agent_skills,
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=self.capabilities,
)

@property
def agent_skills(self) -> list[AgentSkill]:
"""Get the list of skills this agent provides.

Skills represent specific capabilities that the agent can perform.
Strands agent tools are adapted to A2A skills.

Returns:
list[AgentSkill]: A list of skills this agent provides.
"""
return []

def to_starlette_app(self) -> Starlette:
"""Create a Starlette application for serving this agent via HTTP.

This method creates a Starlette application that can be used to serve
the agent via HTTP using the A2A protocol.

Returns:
Starlette: A Starlette application configured to serve this agent.
"""
starlette_app = A2AStarletteApplication(agent_card=self.public_agent_card, http_handler=self.request_handler)
return starlette_app.build()

def to_fastapi_app(self) -> FastAPI:
"""Create a FastAPI application for serving this agent via HTTP.

This method creates a FastAPI application that can be used to serve
the agent via HTTP using the A2A protocol.

Returns:
FastAPI: A FastAPI application configured to serve this agent.
"""
fastapi_app = A2AFastAPIApplication(agent_card=self.public_agent_card, http_handler=self.request_handler)
return fastapi_app.build()

def serve(self, app_type: Literal["fastapi", "starlette"] = "starlette", **kwargs: Any) -> None:
"""Start the A2A server with the specified application type.

This method starts an HTTP server that exposes the agent via the A2A protocol.
The server can be implemented using either FastAPI or Starlette, depending on
the specified app_type.

Args:
app_type: The type of application to serve, either "fastapi" or "starlette".
Defaults to "starlette".
**kwargs: Additional keyword arguments to pass to uvicorn.run.
"""
try:
log.info("Starting Strands agent A2A server...")
if app_type == "fastapi":
uvicorn.run(self.to_fastapi_app(), host=self.host, port=self.port, **kwargs)
else:
uvicorn.run(self.to_starlette_app(), host=self.host, port=self.port, **kwargs)
except KeyboardInterrupt:
log.warning("Server shutdown requested (KeyboardInterrupt).")
finally:
log.info("Strands agent A2A server has shutdown.")
67 changes: 67 additions & 0 deletions src/strands/multiagent/a2a/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Strands Agent executor for the A2A protocol.

This module provides the StrandsA2AExecutor class, which adapts a Strands Agent
to be used as an executor in the A2A protocol. It handles the execution of agent
requests and the conversion of Strands Agent responses to A2A events.
"""

import logging

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import UnsupportedOperationError
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError

from ...agent.agent import Agent as SAAgent
from ...agent.agent_result import AgentResult as SAAgentResult

log = logging.getLogger(__name__)


class StrandsA2AExecutor(AgentExecutor):
"""Executor that adapts a Strands Agent to the A2A protocol."""

def __init__(self, agent: SAAgent):
"""Initialize a StrandsA2AExecutor.

Args:
agent: The Strands Agent to adapt to the A2A protocol.
"""
self.agent = agent

async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
"""Execute a request using the Strands Agent and send the response as A2A events.

This method executes the user's input using the Strands Agent and converts
the agent's response to A2A events, which are then sent to the event queue.

Args:
context: The A2A request context, containing the user's input and other metadata.
event_queue: The A2A event queue, used to send response events.
"""
result: SAAgentResult = self.agent(context.get_user_input())
if result.message and "content" in result.message:
for content_block in result.message["content"]:
if "text" in content_block:
await event_queue.enqueue_event(new_agent_text_message(content_block["text"]))

async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel an ongoing execution.

This method is called when a request is cancelled. Currently, cancellation
is not supported, so this method raises an UnsupportedOperationError.

Args:
context: The A2A request context.
event_queue: The A2A event queue.

Raises:
ServerError: Always raised with an UnsupportedOperationError, as cancellation
is not currently supported.
"""
raise ServerError(error=UnsupportedOperationError())
Loading