-
Notifications
You must be signed in to change notification settings - Fork 228
Description
Problem Statement
The Agent
class provides the stream_async
function that returns an AsyncIterator
:
sdk-python/src/strands/agent/agent.py
Line 356 in 77f5fa7
async def stream_async(self, prompt: str, **kwargs: Any) -> AsyncIterator[Any]: |
The current stream_async
implementation calls _run_loop
and uses threads with a queue to yield streamed data events from the Strands Agents event loop:
sdk-python/src/strands/agent/agent.py
Line 423 in 77f5fa7
def _run_loop( |
sdk-python/src/strands/agent/agent.py
Lines 409 to 421 in 77f5fa7
thread = Thread(target=target_callback, daemon=True) | |
thread.start() | |
try: | |
while True: | |
item = await queue.get() | |
if item == _stop_event: | |
break | |
if isinstance(item, BaseException): | |
raise item | |
yield item | |
finally: | |
thread.join() |
The problem is that this threading implementation doesn't work with Python features like thread local contextvars. Certain Python libraries (e.g. https://github.com/UKGovernmentBEIS/inspect_ai) use thread local context vars and require them to function correctly. The only reasonable solution for correctly supporting concurrency within a single thread is async.
This issue is to propose, discuss, and refine a true async implementation of the Agent.stream_async
function and Strands Agent event loop.
Proposed Solution
No response
Use Case
- Thread local contextvars
- A more maintainable implementation of async agent requests
- Allows for future enhancements that enable streamed tool results
Alternatives Solutions
No response
Additional Context
No response