diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cbf4a57ea..8934dfbe26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -154,6 +154,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `apply_text_normalization` was incorrectly set as a query parameter. It's now being added as a request parameter. +- Updated all STT and TTS services to use consistent error handling pattern with + `push_error()` method for better pipeline error event integration. + - Fixed an issue where `RimeHttpTTSService` and `PiperTTSService` could generate incorrectly 16-bit aligned audio frames, potentially leading to internal errors or static audio. diff --git a/src/pipecat/services/assemblyai/stt.py b/src/pipecat/services/assemblyai/stt.py index b3f20800c1..d78a428417 100644 --- a/src/pipecat/services/assemblyai/stt.py +++ b/src/pipecat/services/assemblyai/stt.py @@ -21,6 +21,7 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -205,8 +206,9 @@ async def _connect(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"Failed to connect to AssemblyAI: {e}") + logger.error(f"{self} exception: {e}") self._connected = False + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _disconnect(self): @@ -231,7 +233,8 @@ async def _disconnect(self): logger.warning("Timed out waiting for termination message from server") except Exception as e: - logger.warning(f"Error during termination handshake: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) if self._receive_task: await self.cancel_task(self._receive_task) @@ -239,7 +242,8 @@ async def _disconnect(self): await self._websocket.close() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None @@ -258,11 +262,13 @@ async def _receive_task_handler(self): except websockets.exceptions.ConnectionClosedOK: break except Exception as e: - logger.error(f"Error processing WebSocket message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) break except Exception as e: - logger.error(f"Fatal error in receive handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) def _parse_message(self, message: Dict[str, Any]) -> BaseMessage: """Parse a raw message into the appropriate message type.""" @@ -291,7 +297,8 @@ async def _handle_message(self, message: Dict[str, Any]): elif isinstance(parsed_message, TerminationMessage): await self._handle_termination(parsed_message) except Exception as e: - logger.error(f"Error handling message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _handle_termination(self, message: TerminationMessage): """Handle termination message.""" diff --git a/src/pipecat/services/asyncai/tts.py b/src/pipecat/services/asyncai/tts.py index 3e4ff33cc2..b2099eb18f 100644 --- a/src/pipecat/services/asyncai/tts.py +++ b/src/pipecat/services/asyncai/tts.py @@ -238,7 +238,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -250,7 +251,8 @@ async def _disconnect_websocket(self): logger.debug("Disconnecting from Async") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None self._started = False @@ -298,7 +300,7 @@ async def _receive_messages(self): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['message']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}")) else: logger.error(f"{self} error, unknown message type: {msg}") @@ -343,7 +345,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -351,6 +354,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class AsyncAIHttpTTSService(TTSService): @@ -484,7 +488,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if response.status != 200: error_text = await response.text() logger.error(f"Async API error: {error_text}") - await self.push_error(ErrorFrame(f"Async API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Async API error: {error_text}")) raise Exception(f"Async API returned status {response.status}: {error_text}") audio_data = await response.read() @@ -501,7 +505,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/aws/stt.py b/src/pipecat/services/aws/stt.py index b019fc0585..a579a54c0d 100644 --- a/src/pipecat/services/aws/stt.py +++ b/src/pipecat/services/aws/stt.py @@ -140,7 +140,8 @@ async def start(self, frame: StartFrame): return logger.warning("WebSocket connection not established after connect") except Exception as e: - logger.error(f"Failed to connect (attempt {retry_count + 1}/{max_retries}): {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) retry_count += 1 if retry_count < max_retries: await asyncio.sleep(1) # Wait before retrying @@ -181,8 +182,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: try: await self._connect() except Exception as e: - logger.error(f"Failed to reconnect: {e}") - yield ErrorFrame("Failed to reconnect to AWS Transcribe", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") return # Format the audio data according to AWS event stream format @@ -199,13 +200,13 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self._disconnect() # Don't yield error here - we'll retry on next frame except Exception as e: - logger.error(f"Error sending audio: {e}") - yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() except Exception as e: - logger.error(f"Error in run_stt: {e}") - yield ErrorFrame(f"AWS Transcribe error: {str(e)}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() async def _connect(self): @@ -288,7 +289,8 @@ async def _connect(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} Failed to connect to AWS Transcribe: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await self._disconnect() raise @@ -308,7 +310,8 @@ async def _disconnect(self): await self._ws_client.send(json.dumps(end_stream)) await self._ws_client.close() except Exception as e: - logger.warning(f"{self} Error closing WebSocket connection: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._ws_client = None await self._call_event_handler("on_disconnected") @@ -527,9 +530,7 @@ async def _receive_loop(self): elif headers.get(":message-type") == "exception": error_msg = payload.get("Message", "Unknown error") logger.error(f"{self} Exception from AWS: {error_msg}") - await self.push_frame( - ErrorFrame(f"AWS Transcribe error: {error_msg}", fatal=False) - ) + await self.push_frame(ErrorFrame(f"AWS Transcribe error: {error_msg}")) else: logger.debug(f"{self} Other message type received: {headers}") logger.debug(f"{self} Payload: {payload}") @@ -537,5 +538,6 @@ async def _receive_loop(self): logger.error(f"{self} WebSocket connection closed in receive loop: {e}") break except Exception as e: - logger.error(f"{self} Unexpected error in receive loop: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) break diff --git a/src/pipecat/services/azure/stt.py b/src/pipecat/services/azure/stt.py index 586a94e442..85a0508a0a 100644 --- a/src/pipecat/services/azure/stt.py +++ b/src/pipecat/services/azure/stt.py @@ -18,6 +18,7 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -111,13 +112,17 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: audio: Raw audio bytes to process. Yields: - None - actual transcription frames are pushed via callbacks. + Frame: Either None for successful processing or ErrorFrame on failure. """ - await self.start_processing_metrics() - await self.start_ttfb_metrics() - if self._audio_stream: - self._audio_stream.write(audio) - yield None + try: + await self.start_processing_metrics() + await self.start_ttfb_metrics() + if self._audio_stream: + self._audio_stream.write(audio) + yield None + except Exception as e: + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") async def start(self, frame: StartFrame): """Start the speech recognition service. @@ -133,17 +138,21 @@ async def start(self, frame: StartFrame): if self._audio_stream: return - stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) - self._audio_stream = PushAudioInputStream(stream_format) + try: + stream_format = AudioStreamFormat(samples_per_second=self.sample_rate, channels=1) + self._audio_stream = PushAudioInputStream(stream_format) - audio_config = AudioConfig(stream=self._audio_stream) + audio_config = AudioConfig(stream=self._audio_stream) - self._speech_recognizer = SpeechRecognizer( - speech_config=self._speech_config, audio_config=audio_config - ) - self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) - self._speech_recognizer.recognized.connect(self._on_handle_recognized) - self._speech_recognizer.start_continuous_recognition_async() + self._speech_recognizer = SpeechRecognizer( + speech_config=self._speech_config, audio_config=audio_config + ) + self._speech_recognizer.recognizing.connect(self._on_handle_recognizing) + self._speech_recognizer.recognized.connect(self._on_handle_recognized) + self._speech_recognizer.start_continuous_recognition_async() + except Exception as e: + logger.error(f"{self} exception during initialization: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def stop(self, frame: EndFrame): """Stop the speech recognition service. diff --git a/src/pipecat/services/azure/tts.py b/src/pipecat/services/azure/tts.py index 15b4f1256e..c4ee1580ce 100644 --- a/src/pipecat/services/azure/tts.py +++ b/src/pipecat/services/azure/tts.py @@ -328,7 +328,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if self._speech_synthesizer is None: error_msg = "Speech synthesizer not initialized." logger.error(error_msg) - yield ErrorFrame(error_msg) + yield ErrorFrame(error=error_msg) return try: @@ -355,13 +355,15 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStoppedFrame() except Exception as e: - logger.error(f"{self} error during synthesis: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() # Could add reconnection logic here if needed return except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class AzureHttpTTSService(AzureBaseTTSService): @@ -439,3 +441,4 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.warning(f"Speech synthesis canceled: {cancellation_details.reason}") if cancellation_details.reason == CancellationReason.Error: logger.error(f"{self} error: {cancellation_details.error_details}") + yield ErrorFrame(error=f"{self} error: {cancellation_details.error_details}") diff --git a/src/pipecat/services/cartesia/stt.py b/src/pipecat/services/cartesia/stt.py index b4e232c4ac..a2ae9432f2 100644 --- a/src/pipecat/services/cartesia/stt.py +++ b/src/pipecat/services/cartesia/stt.py @@ -20,6 +20,7 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -275,7 +276,8 @@ async def _connect_websocket(self): self._websocket = await websocket_connect(ws_url, additional_headers=headers) await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self}: unable to connect to Cartesia: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _disconnect_websocket(self): try: @@ -284,6 +286,7 @@ async def _disconnect_websocket(self): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") @@ -315,7 +318,9 @@ async def _process_response(self, data): await self._on_transcript(data) elif data["type"] == "error": - logger.error(f"Cartesia error: {data.get('message', 'Unknown error')}") + error_msg = data.get("message", "Unknown error") + logger.error(f"Cartesia error: {error_msg}") + await self.push_error(ErrorFrame(error=error_msg)) @traced_stt async def _handle_transcription( diff --git a/src/pipecat/services/cartesia/tts.py b/src/pipecat/services/cartesia/tts.py index 90f0ac3b49..5f7bba24ad 100644 --- a/src/pipecat/services/cartesia/tts.py +++ b/src/pipecat/services/cartesia/tts.py @@ -350,7 +350,8 @@ async def _connect_websocket(self): ) await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -362,7 +363,8 @@ async def _disconnect_websocket(self): logger.debug("Disconnecting from Cartesia") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._context_id = None self._websocket = None @@ -418,7 +420,7 @@ async def _process_messages(self): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) self._context_id = None else: logger.error(f"{self} error, unknown message type: {msg}") @@ -459,7 +461,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._get_websocket().send(msg) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -467,6 +470,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class CartesiaHttpTTSService(TTSService): @@ -648,7 +652,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if response.status != 200: error_text = await response.text() logger.error(f"Cartesia API error: {error_text}") - await self.push_error(ErrorFrame(f"Cartesia API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Cartesia API error: {error_text}")) raise Exception(f"Cartesia API returned status {response.status}: {error_text}") audio_data = await response.read() @@ -665,7 +669,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index f0b1a5baa9..58420ceb6e 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -184,7 +184,8 @@ async def _disconnect(self): await self._disconnect_websocket() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Reset state only after everything is cleaned up self._websocket = None @@ -207,7 +208,8 @@ async def _connect_websocket(self): logger.debug("Connected to Deepgram Flux Websocket") await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -226,6 +228,7 @@ async def _disconnect_websocket(self): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._websocket = None await self._call_event_handler("on_disconnected") @@ -326,14 +329,14 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: """ if not self._websocket: logger.error("Not connected to Deepgram Flux.") - yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True) + yield ErrorFrame("Not connected to Deepgram Flux.") return try: await self._websocket.send(audio) except Exception as e: - logger.error(f"Failed to send audio to Flux: {e}") - yield ErrorFrame(f"Failed to send audio to Flux: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") return yield None @@ -410,7 +413,8 @@ async def _receive_messages(self): # Skip malformed messages continue except Exception as e: - logger.error(f"Error processing message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Error will be handled inside WebsocketService->_receive_task_handler raise else: diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index fb5f670298..bc6ecfa631 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -256,7 +256,7 @@ async def start_metrics(self): async def _on_error(self, *args, **kwargs): error: ErrorResponse = kwargs["error"] logger.warning(f"{self} connection error, will retry: {error}") - await self.push_error(ErrorFrame(f"{error}")) + await self.push_error(ErrorFrame(error=f"{error}")) await self.stop_all_metrics() # NOTE(aleix): we don't disconnect (i.e. call finish on the connection) # because this triggers more errors internally in the Deepgram SDK. So, diff --git a/src/pipecat/services/deepgram/tts.py b/src/pipecat/services/deepgram/tts.py index 5819e4123d..1d134c135f 100644 --- a/src/pipecat/services/deepgram/tts.py +++ b/src/pipecat/services/deepgram/tts.py @@ -115,5 +115,5 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} exception: {e}") - yield ErrorFrame(f"Error getting audio: {str(e)}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) diff --git a/src/pipecat/services/elevenlabs/stt.py b/src/pipecat/services/elevenlabs/stt.py index 291bad4142..ded2f6517b 100644 --- a/src/pipecat/services/elevenlabs/stt.py +++ b/src/pipecat/services/elevenlabs/stt.py @@ -335,5 +335,5 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: ) except Exception as e: - logger.error(f"ElevenLabs STT error: {e}") - yield ErrorFrame(f"ElevenLabs STT error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/elevenlabs/tts.py b/src/pipecat/services/elevenlabs/tts.py index 460b23d18b..716d4a9296 100644 --- a/src/pipecat/services/elevenlabs/tts.py +++ b/src/pipecat/services/elevenlabs/tts.py @@ -419,7 +419,8 @@ async def _update_settings(self, settings: Mapping[str, Any]): json.dumps({"context_id": self._context_id, "close_context": True}) ) except Exception as e: - logger.warning(f"Error closing context for voice settings update: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._context_id = None self._started = False @@ -530,8 +531,9 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") self._websocket = None + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await self._call_event_handler("on_connection_error", f"{e}") async def _disconnect_websocket(self): @@ -546,7 +548,8 @@ async def _disconnect_websocket(self): await self._websocket.close() logger.debug("Disconnected from ElevenLabs") except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._context_id = None @@ -576,7 +579,8 @@ async def _handle_interruption(self, frame: InterruptionFrame, direction: FrameD json.dumps({"context_id": self._context_id, "close_context": True}) ) except Exception as e: - logger.error(f"Error closing context on interruption: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._context_id = None self._started = False self._partial_word = "" @@ -726,13 +730,15 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: else: await self._send_text(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") yield TTSStoppedFrame() + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._started = False return yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class ElevenLabsHttpTTSService(WordTTSService): @@ -1067,7 +1073,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.warning(f"Failed to parse JSON from stream: {e}") continue except Exception as e: - logger.error(f"Error processing response: {e}", exc_info=True) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) continue # After processing all chunks, emit any remaining partial word @@ -1091,8 +1098,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: self._previous_text = text except Exception as e: - logger.error(f"Error in run_tts: {e}") - yield ErrorFrame(error=str(e)) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() # Let the parent class handle TTSStoppedFrame diff --git a/src/pipecat/services/fal/stt.py b/src/pipecat/services/fal/stt.py index 202c03c1bb..cb67af0ac1 100644 --- a/src/pipecat/services/fal/stt.py +++ b/src/pipecat/services/fal/stt.py @@ -298,5 +298,5 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: ) except Exception as e: - logger.error(f"Fal Wizper error: {e}") - yield ErrorFrame(f"Fal Wizper error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/fish/tts.py b/src/pipecat/services/fish/tts.py index 669d2ce974..5fe1299981 100644 --- a/src/pipecat/services/fish/tts.py +++ b/src/pipecat/services/fish/tts.py @@ -228,7 +228,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"Fish Audio initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -242,7 +243,8 @@ async def _disconnect_websocket(self): await self._websocket.send(ormsgpack.packb(stop_message)) await self._websocket.close() except Exception as e: - logger.error(f"Error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._request_id = None self._started = False @@ -284,7 +286,8 @@ async def _receive_messages(self): continue except Exception as e: - logger.error(f"Error processing message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) @traced_tts async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: @@ -320,7 +323,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: flush_message = {"event": "flush"} await self._get_websocket().send(ormsgpack.packb(flush_message)) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -328,5 +332,5 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: - logger.error(f"Error generating TTS: {e}") - yield ErrorFrame(f"Error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/gladia/stt.py b/src/pipecat/services/gladia/stt.py index f9ff91b4a6..bc3aa7f5c7 100644 --- a/src/pipecat/services/gladia/stt.py +++ b/src/pipecat/services/gladia/stt.py @@ -23,6 +23,7 @@ from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, Frame, InterimTranscriptionFrame, StartFrame, @@ -477,7 +478,8 @@ async def _connection_handler(self): break except Exception as e: - logger.error(f"Error in connection handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._connection_active = False if not self._should_reconnect: @@ -567,7 +569,8 @@ async def _keepalive_task_handler(self): except websockets.exceptions.ConnectionClosed: logger.debug("Connection closed during keepalive") except Exception as e: - logger.error(f"Error in Gladia keepalive task: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _receive_task_handler(self): try: @@ -630,7 +633,8 @@ async def _receive_task_handler(self): # Expected when closing the connection pass except Exception as e: - logger.error(f"Error in Gladia WebSocket handler: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _maybe_reconnect(self) -> bool: """Handle exponential backoff reconnection logic.""" diff --git a/src/pipecat/services/google/gemini_live/llm.py b/src/pipecat/services/google/gemini_live/llm.py index 4b5f052098..d27649e2b5 100644 --- a/src/pipecat/services/google/gemini_live/llm.py +++ b/src/pipecat/services/google/gemini_live/llm.py @@ -1008,7 +1008,7 @@ async def _connect(self, session_resumption_handle: Optional[str] = None): self._connection_task = self.create_task(self._connection_task_handler(config=config)) except Exception as e: - await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"{self} Initialization error: {e}")) async def _connection_task_handler(self, config: LiveConnectConfig): async with self._client.aio.live.connect(model=self._model_name, config=config) as session: @@ -1089,9 +1089,7 @@ async def _handle_connection_error(self, error: Exception) -> bool: f"Max consecutive failures ({MAX_CONSECUTIVE_FAILURES}) reached, " "treating as fatal error" ) - await self.push_error( - ErrorFrame(error=f"{self} Error in receive loop: {error}", fatal=True) - ) + await self.push_error(ErrorFrame(error=f"{self} Error in receive loop: {error}")) return False else: logger.info( @@ -1549,7 +1547,7 @@ async def _handle_send_error(self, error: Exception): # cost/stability implications for a service cluster, let's just treat a # send-side error as fatal. if not self._disconnecting: - await self.push_error(ErrorFrame(error=f"{self} Send error: {error}", fatal=True)) + await self.push_error(ErrorFrame(error=f"{self} Send error: {error}")) def create_context_aggregator( self, diff --git a/src/pipecat/services/google/stt.py b/src/pipecat/services/google/stt.py index b9e56f55bf..5509951aea 100644 --- a/src/pipecat/services/google/stt.py +++ b/src/pipecat/services/google/stt.py @@ -773,7 +773,8 @@ async def _request_generator(self): yield cloud_speech.StreamingRecognizeRequest(audio=audio_data) except Exception as e: - logger.error(f"Error in request generator: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _stream_audio(self): @@ -804,14 +805,15 @@ async def _stream_audio(self): break except Exception as e: - logger.warning(f"{self} Reconnecting: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) await asyncio.sleep(1) # Brief delay before reconnecting self._stream_start_time = int(time.time() * 1000) except Exception as e: - logger.error(f"Error in streaming task: {e}") - await self.push_frame(ErrorFrame(str(e))) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: """Process an audio chunk for STT transcription. @@ -887,7 +889,8 @@ async def _process_responses(self, streaming_recognize): ) ) except Exception as e: - logger.error(f"Error processing Google STT responses: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Re-raise the exception to let it propagate (e.g. in the case of a # timeout, propagate to _stream_audio to reconnect) raise diff --git a/src/pipecat/services/google/tts.py b/src/pipecat/services/google/tts.py index bfda3292a4..11b28019cf 100644 --- a/src/pipecat/services/google/tts.py +++ b/src/pipecat/services/google/tts.py @@ -467,7 +467,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) @@ -667,7 +667,7 @@ async def request_generator(): yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) @@ -916,6 +916,6 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStoppedFrame() except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") error_message = f"Gemini TTS generation error: {str(e)}" yield ErrorFrame(error=error_message) diff --git a/src/pipecat/services/groq/tts.py b/src/pipecat/services/groq/tts.py index 68ba4a5986..801ab2089d 100644 --- a/src/pipecat/services/groq/tts.py +++ b/src/pipecat/services/groq/tts.py @@ -13,7 +13,13 @@ from loguru import logger from pydantic import BaseModel -from pipecat.frames.frames import Frame, TTSAudioRawFrame, TTSStartedFrame, TTSStoppedFrame +from pipecat.frames.frames import ( + ErrorFrame, + Frame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, +) from pipecat.services.tts_service import TTSService from pipecat.transcriptions.language import Language from pipecat.utils.tracing.service_decorators import traced_tts @@ -141,5 +147,6 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSAudioRawFrame(bytes, frame_rate, channels) except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() diff --git a/src/pipecat/services/hume/tts.py b/src/pipecat/services/hume/tts.py index 2701c5d05d..32c636a4d6 100644 --- a/src/pipecat/services/hume/tts.py +++ b/src/pipecat/services/hume/tts.py @@ -212,8 +212,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: self._audio_bytes = b"" except Exception as e: - logger.exception(f"{self} error generating TTS: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Ensure TTFB timer is stopped even on early failures await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/inworld/tts.py b/src/pipecat/services/inworld/tts.py index eef1440e31..e68e0bfb38 100644 --- a/src/pipecat/services/inworld/tts.py +++ b/src/pipecat/services/inworld/tts.py @@ -365,7 +365,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if response.status != 200: error_text = await response.text() logger.error(f"Inworld API error: {error_text}") - await self.push_error(ErrorFrame(f"Inworld API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Inworld API error: {error_text}")) return # ================================================================================ @@ -393,7 +393,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: # ================================================================================ # Log any unexpected errors and notify the pipeline logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # ================================================================================ # STEP 8: CLEANUP AND COMPLETION @@ -508,7 +508,7 @@ async def _process_non_streaming_response( # Extract the base64-encoded audio content from response if "audioContent" not in response_data: logger.error("No audioContent in Inworld API response") - await self.push_error(ErrorFrame("No audioContent in response")) + await self.push_error(ErrorFrame(error="No audioContent in response")) return # ================================================================================ diff --git a/src/pipecat/services/lmnt/tts.py b/src/pipecat/services/lmnt/tts.py index 9f9fef5fca..c7b6e46c63 100644 --- a/src/pipecat/services/lmnt/tts.py +++ b/src/pipecat/services/lmnt/tts.py @@ -224,7 +224,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -240,7 +241,8 @@ async def _disconnect_websocket(self): # await self._websocket.send(json.dumps({"eof": True})) await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -277,7 +279,7 @@ async def _receive_messages(self): logger.error(f"{self} error: {msg['error']}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) return except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") @@ -310,7 +312,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._get_websocket().send(json.dumps({"flush": True})) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -318,3 +321,4 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) diff --git a/src/pipecat/services/minimax/tts.py b/src/pipecat/services/minimax/tts.py index cd63fe7614..c5f6c5a25c 100644 --- a/src/pipecat/services/minimax/tts.py +++ b/src/pipecat/services/minimax/tts.py @@ -351,8 +351,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: continue except Exception as e: - logger.exception(f"Error generating TTS: {e}") - yield ErrorFrame(error=f"MiniMax TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/neuphonic/tts.py b/src/pipecat/services/neuphonic/tts.py index 6ccdfe17f0..993b941876 100644 --- a/src/pipecat/services/neuphonic/tts.py +++ b/src/pipecat/services/neuphonic/tts.py @@ -296,7 +296,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -309,7 +310,8 @@ async def _disconnect_websocket(self): logger.debug("Disconnecting from Neuphonic") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -374,7 +376,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -382,6 +385,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class NeuphonicHttpTTSService(TTSService): @@ -575,7 +579,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSAudioRawFrame(audio_bytes, self.sample_rate, 1) except Exception as e: - logger.error(f"Error processing SSE message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Don't yield error frame for individual message failures continue @@ -583,8 +588,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: logger.debug("TTS generation cancelled") raise except Exception as e: - logger.exception(f"Error in run_tts: {e}") - yield ErrorFrame(error=f"Neuphonic TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/openai/realtime/llm.py b/src/pipecat/services/openai/realtime/llm.py index 8b3d500ebd..f50eefb4d8 100644 --- a/src/pipecat/services/openai/realtime/llm.py +++ b/src/pipecat/services/openai/realtime/llm.py @@ -455,7 +455,7 @@ async def _ws_send(self, realtime_message): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) async def _update_settings(self): settings = self._session_properties @@ -646,9 +646,7 @@ async def _handle_evt_response_done(self, evt): self._current_assistant_response = None # error handling if evt.response.status == "failed": - await self.push_error( - ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True) - ) + await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"])) return # response content for item in evt.response.output: @@ -745,7 +743,7 @@ async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.E async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error: {evt}")) # # state and client events for the current conversation diff --git a/src/pipecat/services/openai/tts.py b/src/pipecat/services/openai/tts.py index cdf0d11ac2..23cb75324b 100644 --- a/src/pipecat/services/openai/tts.py +++ b/src/pipecat/services/openai/tts.py @@ -190,7 +190,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: f"{self} error getting audio (status: {r.status_code}, error: {error})" ) yield ErrorFrame( - f"Error getting audio (status: {r.status_code}, error: {error})" + error=f"Error getting audio (status: {r.status_code}, error: {error})" ) return @@ -207,3 +207,4 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield TTSStoppedFrame() except BadRequestError as e: logger.exception(f"{self} error generating TTS: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/openai_realtime_beta/openai.py b/src/pipecat/services/openai_realtime_beta/openai.py index 922f9a5726..af0600882e 100644 --- a/src/pipecat/services/openai_realtime_beta/openai.py +++ b/src/pipecat/services/openai_realtime_beta/openai.py @@ -454,7 +454,7 @@ async def _ws_send(self, realtime_message): # it is to recover from a send-side error with proper state management, and that exponential # backoff for retries can have cost/stability implications for a service cluster, let's just # treat a send-side error as fatal. - await self.push_error(ErrorFrame(error=f"Error sending client event: {e}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error sending client event: {e}")) async def _update_settings(self): settings = self._session_properties @@ -627,9 +627,7 @@ async def _handle_evt_response_done(self, evt): self._current_assistant_response = None # error handling if evt.response.status == "failed": - await self.push_error( - ErrorFrame(error=evt.response.status_details["error"]["message"], fatal=True) - ) + await self.push_error(ErrorFrame(error=evt.response.status_details["error"]["message"])) return # response content for item in evt.response.output: @@ -687,7 +685,7 @@ async def _maybe_handle_evt_retrieve_conversation_item_error(self, evt: events.E async def _handle_evt_error(self, evt): # Errors are fatal to this connection. Send an ErrorFrame. - await self.push_error(ErrorFrame(error=f"Error: {evt}", fatal=True)) + await self.push_error(ErrorFrame(error=f"Error: {evt}")) async def _handle_assistant_output(self, output): # We haven't seen intermixed audio and function_call items in the same response. But let's diff --git a/src/pipecat/services/piper/tts.py b/src/pipecat/services/piper/tts.py index fa43a720c2..dd842ff11f 100644 --- a/src/pipecat/services/piper/tts.py +++ b/src/pipecat/services/piper/tts.py @@ -92,7 +92,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: f"{self} error getting audio (status: {response.status}, error: {error})" ) yield ErrorFrame( - f"Error getting audio (status: {response.status}, error: {error})" + error=f"Error getting audio (status: {response.status}, error: {error})" ) return @@ -108,8 +108,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self.stop_ttfb_metrics() yield frame except Exception as e: - logger.error(f"Error in run_tts: {e}") - yield ErrorFrame(error=str(e)) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") finally: logger.debug(f"{self}: Finished TTS [{text}]") await self.stop_ttfb_metrics() diff --git a/src/pipecat/services/playht/tts.py b/src/pipecat/services/playht/tts.py index 9254807948..4ead167478 100644 --- a/src/pipecat/services/playht/tts.py +++ b/src/pipecat/services/playht/tts.py @@ -276,7 +276,8 @@ async def _connect_websocket(self): self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -289,7 +290,8 @@ async def _disconnect_websocket(self): logger.debug("Disconnecting from PlayHT") await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._request_id = None self._websocket = None @@ -360,7 +362,7 @@ async def _receive_messages(self): self._request_id = None elif "error" in msg: logger.error(f"{self} error: {msg}") - await self.push_error(ErrorFrame(f"{self} error: {msg['error']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['error']}")) except json.JSONDecodeError: logger.error(f"Invalid JSON message: {message}") @@ -402,7 +404,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._get_websocket().send(json.dumps(tts_command)) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -412,8 +415,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: - logger.error(f"{self} error generating TTS: {e}") - yield ErrorFrame(f"{self} error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class PlayHTHttpTTSService(TTSService): @@ -633,7 +636,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield frame except Exception as e: - logger.error(f"{self} error generating TTS: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/rime/tts.py b/src/pipecat/services/rime/tts.py index fa3fa447dc..7bc4f64376 100644 --- a/src/pipecat/services/rime/tts.py +++ b/src/pipecat/services/rime/tts.py @@ -258,7 +258,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -270,7 +271,8 @@ async def _disconnect_websocket(self): await self._websocket.send(json.dumps(self._build_eos_msg())) await self._websocket.close() except Exception as e: - logger.error(f"{self} error closing websocket: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._context_id = None self._websocket = None @@ -366,7 +368,7 @@ async def _receive_messages(self): logger.error(f"{self} error: {msg}") await self.push_frame(TTSStoppedFrame()) await self.stop_all_metrics() - await self.push_error(ErrorFrame(f"{self} error: {msg['message']}")) + await self.push_error(ErrorFrame(error=f"{self} error: {msg['message']}")) self._context_id = None async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): @@ -408,7 +410,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._get_websocket().send(json.dumps(msg)) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -416,6 +419,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) class RimeHttpTTSService(TTSService): @@ -564,8 +568,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield frame except Exception as e: - logger.exception(f"Error generating TTS: {e}") - yield ErrorFrame(error=f"Rime TTS error: {str(e)}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() diff --git a/src/pipecat/services/riva/stt.py b/src/pipecat/services/riva/stt.py index eddd3da9e6..68f68d7b0f 100644 --- a/src/pipecat/services/riva/stt.py +++ b/src/pipecat/services/riva/stt.py @@ -659,8 +659,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: yield ErrorFrame(f"Unexpected Riva response format: {str(ae)}") except Exception as e: - logger.exception(f"Riva Canary ASR error: {e}") - yield ErrorFrame(f"Riva Canary ASR error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") class ParakeetSTTService(RivaSTTService): diff --git a/src/pipecat/services/riva/tts.py b/src/pipecat/services/riva/tts.py index 1889c19dd8..6649f12401 100644 --- a/src/pipecat/services/riva/tts.py +++ b/src/pipecat/services/riva/tts.py @@ -23,6 +23,7 @@ from pydantic import BaseModel from pipecat.frames.frames import ( + ErrorFrame, Frame, TTSAudioRawFrame, TTSStartedFrame, @@ -156,6 +157,7 @@ def add_response(r): add_response(None) except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) add_response(None) await self.start_ttfb_metrics() diff --git a/src/pipecat/services/sarvam/tts.py b/src/pipecat/services/sarvam/tts.py index 762776d50f..cbc5d2e140 100644 --- a/src/pipecat/services/sarvam/tts.py +++ b/src/pipecat/services/sarvam/tts.py @@ -255,7 +255,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if response.status != 200: error_text = await response.text() logger.error(f"Sarvam API error: {error_text}") - await self.push_error(ErrorFrame(f"Sarvam API error: {error_text}")) + await self.push_error(ErrorFrame(error=f"Sarvam API error: {error_text}")) return response_data = await response.json() @@ -265,7 +265,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: # Decode base64 audio data if "audios" not in response_data or not response_data["audios"]: logger.error("No audio data received from Sarvam API") - await self.push_error(ErrorFrame("No audio data received")) + await self.push_error(ErrorFrame(error="No audio data received")) return # Get the first audio (there should be only one for single text input) @@ -287,7 +287,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: except Exception as e: logger.error(f"{self} exception: {e}") - await self.push_error(ErrorFrame(f"Error generating TTS: {e}")) + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: await self.stop_ttfb_metrics() yield TTSStoppedFrame() @@ -575,7 +575,8 @@ async def _disconnect(self): await self._disconnect_websocket() except Exception as e: - logger.error(f"Error during disconnect: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: # Reset state only after everything is cleaned up self._started = False @@ -599,7 +600,8 @@ async def _connect_websocket(self): await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} initialization error: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._websocket = None await self._call_event_handler("on_connection_error", f"{e}") @@ -615,8 +617,8 @@ async def _send_config(self): await self._websocket.send(json.dumps(config_message)) logger.debug("Configuration sent successfully") except Exception as e: - logger.error(f"Failed to send config: {str(e)}") - await self.push_frame(ErrorFrame(f"Failed to send config: {str(e)}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise async def _disconnect_websocket(self): @@ -629,6 +631,7 @@ async def _disconnect_websocket(self): await self._websocket.close() except Exception as e: logger.error(f"{self} error closing websocket: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._started = False self._websocket = None @@ -658,7 +661,7 @@ async def _receive_messages(self): if "too long" in error_msg.lower() or "timeout" in error_msg.lower(): logger.warning("Connection timeout detected, service may need restart") - await self.push_frame(ErrorFrame(f"TTS Error: {error_msg}")) + await self.push_frame(ErrorFrame(error=f"TTS Error: {error_msg}")) async def _keepalive_task_handler(self): """Handle keepalive messages to maintain WebSocket connection.""" @@ -714,7 +717,8 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: await self._send_text(text) await self.start_tts_usage_metrics(text) except Exception as e: - logger.error(f"{self} error sending message: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) yield TTSStoppedFrame() await self._disconnect() await self._connect() @@ -722,3 +726,4 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: yield None except Exception as e: logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) diff --git a/src/pipecat/services/soniox/stt.py b/src/pipecat/services/soniox/stt.py index 1cf2d51948..e3864d4fe1 100644 --- a/src/pipecat/services/soniox/stt.py +++ b/src/pipecat/services/soniox/stt.py @@ -296,8 +296,8 @@ async def _keepalive_task_handler(self): # Expected when closing the connection logger.debug("WebSocket connection closed, keepalive task stopped.") except Exception as e: - logger.error(f"{self} error (_keepalive_task_handler): {e}") - await self.push_error(ErrorFrame(f"{self} error (_keepalive_task_handler): {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) async def _receive_task_handler(self): if not self._websocket: @@ -378,7 +378,7 @@ async def send_endpoint_transcript(): ) await self.push_error( ErrorFrame( - f"{self} error: {error_code} (_receive_task_handler) - {error_message}" + error=f"{self} error: {error_code} (_receive_task_handler) - {error_message}" ) ) @@ -394,5 +394,5 @@ async def send_endpoint_transcript(): # Expected when closing the connection. pass except Exception as e: - logger.error(f"{self} error: {e}") - await self.push_error(ErrorFrame(f"{self} error: {e}")) + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) diff --git a/src/pipecat/services/speechmatics/stt.py b/src/pipecat/services/speechmatics/stt.py index 901edb0e8a..9e61972c51 100644 --- a/src/pipecat/services/speechmatics/stt.py +++ b/src/pipecat/services/speechmatics/stt.py @@ -467,8 +467,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self._client.send_audio(audio) yield None except Exception as e: - logger.error(f"Speechmatics error: {e}") - yield ErrorFrame(f"Speechmatics error: {e}", fatal=False) + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") await self._disconnect() def update_params( @@ -514,6 +514,8 @@ async def send_message(self, message: ClientMessageType | str, **kwargs: Any) -> self._client.send_message(payload), self.get_event_loop() ) except Exception as e: + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) raise RuntimeError(f"error sending message to STT: {e}") async def _connect(self) -> None: @@ -579,7 +581,8 @@ def _evt_on_speakers_result(message: dict[str, Any]): logger.debug(f"{self} Connected to Speechmatics STT service") await self._call_event_handler("on_connected") except Exception as e: - logger.error(f"{self} Error connecting to Speechmatics: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) self._client = None async def _disconnect(self) -> None: @@ -593,7 +596,8 @@ async def _disconnect(self) -> None: except asyncio.TimeoutError: logger.warning(f"{self} Timeout while closing Speechmatics client connection") except Exception as e: - logger.error(f"{self} Error closing Speechmatics client: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) finally: self._client = None await self._call_event_handler("on_disconnected") diff --git a/src/pipecat/services/ultravox/stt.py b/src/pipecat/services/ultravox/stt.py index 987593f02b..a732e4bdba 100644 --- a/src/pipecat/services/ultravox/stt.py +++ b/src/pipecat/services/ultravox/stt.py @@ -246,7 +246,8 @@ async def _warm_up_model(self): logger.info("Model warm-up completed successfully") except Exception as e: - logger.warning(f"Model warm-up failed: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) def _generate_silent_audio(self, sample_rate=16000, duration_sec=1.0): """Generate silent audio as a numpy array. @@ -376,7 +377,7 @@ async def _process_audio_buffer(self) -> AsyncGenerator[Frame, None]: if arr.size > 0: # Check if array is not empty audio_arrays.append(arr) except Exception as e: - logger.error(f"Error processing bytes audio frame: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) # Handle numpy array data elif isinstance(f.audio, np.ndarray): if f.audio.size > 0: # Check if array is not empty @@ -436,14 +437,15 @@ async def _process_audio_buffer(self) -> AsyncGenerator[Frame, None]: yield LLMFullResponseEndFrame() except Exception as e: - logger.error(f"Error generating text from model: {e}") - yield ErrorFrame(f"Error generating text: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") else: - logger.warning("No model available for text generation") + logger.error("No model available for text generation") yield ErrorFrame("No model available for text generation") except Exception as e: - logger.error(f"Error processing audio buffer: {e}") + logger.error(f"{self} exception: {e}") + await self.push_error(ErrorFrame(error=f"{self} error: {e}")) import traceback logger.error(traceback.format_exc()) diff --git a/src/pipecat/services/websocket_service.py b/src/pipecat/services/websocket_service.py index 17a9113667..f9799b9c37 100644 --- a/src/pipecat/services/websocket_service.py +++ b/src/pipecat/services/websocket_service.py @@ -94,7 +94,7 @@ async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Await if self._reconnect_on_error: retry_count += 1 if retry_count >= MAX_RETRIES: - await report_error(ErrorFrame(message, fatal=True)) + await report_error(ErrorFrame(message)) break logger.warning(f"{self} connection error, will retry: {e}") diff --git a/src/pipecat/services/whisper/base_stt.py b/src/pipecat/services/whisper/base_stt.py index 3d9151e379..e16ce8f63a 100644 --- a/src/pipecat/services/whisper/base_stt.py +++ b/src/pipecat/services/whisper/base_stt.py @@ -228,8 +228,8 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: logger.warning("Received empty transcription from API") except Exception as e: - logger.exception(f"Exception during transcription: {e}") - yield ErrorFrame(f"Error during transcription: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") async def _transcribe(self, audio: bytes) -> Transcription: """Transcribe audio data to text. diff --git a/src/pipecat/services/whisper/stt.py b/src/pipecat/services/whisper/stt.py index 353f240e2d..a370ac6cee 100644 --- a/src/pipecat/services/whisper/stt.py +++ b/src/pipecat/services/whisper/stt.py @@ -517,5 +517,5 @@ async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: ) except Exception as e: - logger.exception(f"MLX Whisper transcription error: {e}") - yield ErrorFrame(f"MLX Whisper transcription error: {str(e)}") + logger.error(f"{self} exception: {e}") + yield ErrorFrame(error=f"{self} error: {e}") diff --git a/src/pipecat/services/xtts/tts.py b/src/pipecat/services/xtts/tts.py index 844e0fbaf8..62c483ef54 100644 --- a/src/pipecat/services/xtts/tts.py +++ b/src/pipecat/services/xtts/tts.py @@ -161,7 +161,7 @@ async def start(self, frame: StartFrame): ) await self.push_error( ErrorFrame( - f"Error error getting studio speakers (status: {r.status}, error: {text})" + error=f"Error getting studio speakers (status: {r.status}, error: {text})" ) ) return @@ -202,7 +202,7 @@ async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: if r.status != 200: text = await r.text() logger.error(f"{self} error getting audio (status: {r.status}, error: {text})") - yield ErrorFrame(f"Error getting audio (status: {r.status}, error: {text})") + yield ErrorFrame(error=f"Error getting audio (status: {r.status}, error: {text})") return await self.start_tts_usage_metrics(text)