Skip to content

Conversation

@jamsea
Copy link
Contributor

@jamsea jamsea commented Oct 22, 2025

Problem

When an InterruptionFrame is received (e.g., when a user interrupts the bot mid-response), it triggers task cancellation via asyncio.CancelledError. However, the OpenAI AsyncStream[ChatCompletionChunk] was not being properly closed, which could lead to:

  • Connection leaks
  • Inconsistent stream state
  • Potential resource exhaustion over time

Root Cause

The _process_context() method in base_llm.py iterates over the SSE stream with:

async for chunk in chunk_stream:
    # Process chunks...

When an InterruptionFrame arrives, the frame processor cancels the current process task, but the stream is not explicitly closed, leaving the underlying HTTP/SSE connection in an inconsistent state.

Solution

This PR wraps the entire stream processing section with proper exception handling:

  1. Try block: Contains all the stream iteration and processing logic
  2. Except block: Catches asyncio.CancelledError, logs it for debugging, and re-raises it to maintain proper cancellation propagation
  3. Finally block: Ensures chunk_stream.close() is always called to properly clean up the SSE connection

Changes

  • Added try/except/finally around the async for chunk in chunk_stream: loop
  • Added explicit await chunk_stream.close() in the finally block
  • Added debug logging when cancellation occurs
  • Preserved all existing functionality while ensuring proper resource cleanup

When an InterruptionFrame causes task cancellation, the OpenAI AsyncStream
was not being properly closed, which could lead to connection leaks and
inconsistent stream state.

This fix wraps the stream processing in a try/except/finally block to:
- Catch asyncio.CancelledError from interruptions
- Ensure the stream is always closed via chunk_stream.close()
- Log the cancellation for debugging purposes

This prevents SSE connection leaks when users interrupt the bot mid-response.
@codecov
Copy link

codecov bot commented Oct 22, 2025

Codecov Report

❌ Patch coverage is 0% with 43 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/pipecat/services/openai/base_llm.py 0.00% 43 Missing ⚠️
Files with missing lines Coverage Δ
src/pipecat/services/openai/base_llm.py 32.91% <0.00%> (-1.06%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jamsea jamsea self-assigned this Oct 22, 2025
@jamsea jamsea marked this pull request as ready for review October 22, 2025 01:40
raise
finally:
# Ensure the SSE stream is properly closed to avoid connection leaks
await chunk_stream.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aconchillo are we intentionally not calling this function presently? The main change I'm interested in is line 431, adding a debug log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants