Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,35 +185,42 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
- A GuardrailTripwireTriggered exception if a guardrail is tripped.
"""
while True:
self._check_errors()
if self._stored_exception:
logger.debug("Breaking due to stored exception")
self.is_complete = True
break
try:
while True:
self._check_errors()
if self._stored_exception:
logger.debug("Breaking due to stored exception")
self.is_complete = True
break

if self.is_complete and self._event_queue.empty():
break
if self.is_complete and self._event_queue.empty():
break

try:
item = await self._event_queue.get()
except asyncio.CancelledError:
break
try:
item = await self._event_queue.get()
except asyncio.CancelledError:
break

if isinstance(item, QueueCompleteSentinel):
# Await input guardrails if they are still running, so late exceptions are captured.
await self._await_task_safely(self._input_guardrails_task)
if isinstance(item, QueueCompleteSentinel):
# Await input guardrails if they are still running, so late
# exceptions are captured.
await self._await_task_safely(self._input_guardrails_task)

self._event_queue.task_done()
self._event_queue.task_done()

# Check for errors, in case the queue was completed due to an exception
self._check_errors()
break

yield item
self._event_queue.task_done()
# Check for errors, in case the queue was completed
# due to an exception
self._check_errors()
break

self._cleanup_tasks()
yield item
self._event_queue.task_done()
finally:
# Ensure main execution completes before cleanup to avoid race conditions
# with session operations
await self._await_task_safely(self._run_impl_task)
# Safely terminate all background tasks after main execution has finished
self._cleanup_tasks()

if self._stored_exception:
raise self._stored_exception
Expand Down