diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ab7c71217..a612c66cc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -145,6 +145,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `DailyTransport` triggers `on_error` event if transcription can't be started + or stopped. + +- `DailyTransport` updates: `start_dialout()` now returns two values: + `session_id` and `error`. `start_recording()` now returns two values: + `stream_id` and `error`. + - Updated `daily-python` to 0.21.0. - `SimliVideoService` now accepts `api_key` and `face_id` parameters directly, diff --git a/src/pipecat/transports/daily/transport.py b/src/pipecat/transports/daily/transport.py index 2e48c4d2e8..18c36ee56a 100644 --- a/src/pipecat/transports/daily/transport.py +++ b/src/pipecat/transports/daily/transport.py @@ -16,7 +16,7 @@ from concurrent.futures import CancelledError as FuturesCancelledError from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from typing import Any, Awaitable, Callable, Dict, Mapping, Optional +from typing import Any, Awaitable, Callable, Dict, Mapping, Optional, Tuple import aiohttp from loguru import logger @@ -419,6 +419,11 @@ class DailyAudioTrack: track: CustomAudioTrack +# This is just a type alias for the errors returned by daily-python. Right now +# they are just a string. +CallClientError = str + + class DailyTransportClient(EventHandler): """Core client for interacting with Daily's API. @@ -553,14 +558,17 @@ def out_sample_rate(self) -> int: async def send_message( self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame - ): + ) -> Optional[CallClientError]: """Send an application message to participants. Args: frame: The message frame to send. + + Returns: + error: An error description or None. """ if not self._joined: - return + return "Unable to send messages before joining." participant_id = None if isinstance( @@ -572,7 +580,7 @@ async def send_message( self._client.send_app_message( frame.message, participant_id, completion=completion_callback(future) ) - await future + return await future async def read_next_audio_frame(self) -> Optional[InputAudioRawFrame]: """Reads the next 20ms audio frame from the virtual speaker.""" @@ -754,9 +762,6 @@ async def join(self): logger.info(f"Joined {self._room_url}") - if self._params.transcription_enabled: - await self.start_transcription(self._params.transcription_settings) - await self._callbacks.on_joined(data) self._joined_event.set() @@ -842,9 +847,6 @@ async def leave(self): # Call callback before leaving. await self._callbacks.on_before_leave() - if self._params.transcription_enabled: - await self.stop_transcription() - # Remove any custom tracks, if any. for track_name, _ in self._custom_audio_tracks.items(): await self.remove_custom_audio_track(track_name) @@ -873,7 +875,7 @@ def _cleanup(self): self._client.release() self._client = None - def participants(self): + def participants(self) -> Mapping[str, Any]: """Get current participants in the room. Returns: @@ -881,7 +883,7 @@ def participants(self): """ return self._client.participants() - def participant_counts(self): + def participant_counts(self) -> Mapping[str, Any]: """Get participant count information. Returns: @@ -889,165 +891,173 @@ def participant_counts(self): """ return self._client.participant_counts() - async def start_dialout(self, settings): + async def start_dialout(self, settings) -> Tuple[str, Optional[CallClientError]]: """Start a dial-out call to a phone number. Args: settings: Dial-out configuration settings. - """ - logger.debug(f"Starting dialout: settings={settings}") + Returns: + session_id: Dail-out session ID. + error: An error description or None. + """ future = self._get_event_loop().create_future() self._client.start_dialout(settings, completion=completion_callback(future)) - error = await future - if error: - logger.error(f"Unable to start dialout: {error}") + return await future - async def stop_dialout(self, participant_id): + async def stop_dialout(self, participant_id) -> Optional[CallClientError]: """Stop a dial-out call for a specific participant. Args: participant_id: ID of the participant to stop dial-out for. - """ - logger.debug(f"Stopping dialout: participant_id={participant_id}") + Returns: + error: An error description or None. + """ future = self._get_event_loop().create_future() self._client.stop_dialout(participant_id, completion=completion_callback(future)) - error = await future - if error: - logger.error(f"Unable to stop dialout: {error}") + return await future - async def send_dtmf(self, settings): + async def send_dtmf(self, settings) -> Optional[CallClientError]: """Send DTMF tones during a call. Args: settings: DTMF settings including tones and target session. + + Returns: + error: An error description or None. """ session_id = settings.get("sessionId") or self._dial_out_session_id if not session_id: - logger.error("Unable to send DTMF: 'sessionId' is not set") - return + return "Can't send DTMF if 'sessionId' is not set" # Update 'sessionId' field. settings["sessionId"] = session_id future = self._get_event_loop().create_future() self._client.send_dtmf(settings, completion=completion_callback(future)) - await future + return await future - async def sip_call_transfer(self, settings): + async def sip_call_transfer(self, settings) -> Optional[CallClientError]: """Transfer a SIP call to another destination. Args: settings: SIP call transfer settings. + + Returns: + error: An error description or None. """ session_id = ( settings.get("sessionId") or self._dial_out_session_id or self._dial_in_session_id ) if not session_id: - logger.error("Unable to transfer SIP call: 'sessionId' is not set") - return + return "Can't transfer SIP call if 'sessionId' is not set" # Update 'sessionId' field. settings["sessionId"] = session_id future = self._get_event_loop().create_future() self._client.sip_call_transfer(settings, completion=completion_callback(future)) - await future + return await future - async def sip_refer(self, settings): + async def sip_refer(self, settings) -> Optional[CallClientError]: """Send a SIP REFER request. Args: settings: SIP REFER settings. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.sip_refer(settings, completion=completion_callback(future)) - await future + return await future - async def start_recording(self, streaming_settings, stream_id, force_new): + async def start_recording( + self, streaming_settings, stream_id, force_new + ) -> Tuple[str, Optional[CallClientError]]: """Start recording the call. Args: streaming_settings: Recording configuration settings. stream_id: Unique identifier for the recording stream. force_new: Whether to force a new recording session. - """ - logger.debug( - f"Starting recording: stream_id={stream_id} force_new={force_new} settings={streaming_settings}" - ) + Returns: + stream_id: Unique identifier for the recording stream. + error: An error description or None. + """ future = self._get_event_loop().create_future() self._client.start_recording( streaming_settings, stream_id, force_new, completion=completion_callback(future) ) - error = await future - if error: - logger.error(f"Unable to start recording: {error}") + return await future - async def stop_recording(self, stream_id): + async def stop_recording(self, stream_id) -> Optional[CallClientError]: """Stop recording the call. Args: stream_id: Unique identifier for the recording stream to stop. - """ - logger.debug(f"Stopping recording: stream_id={stream_id}") + Returns: + error: An error description or None. + """ future = self._get_event_loop().create_future() self._client.stop_recording(stream_id, completion=completion_callback(future)) - error = await future - if error: - logger.error(f"Unable to stop recording: {error}") + return await future - async def start_transcription(self, settings): + async def start_transcription(self, settings) -> Optional[CallClientError]: """Start transcription for the call. Args: settings: Transcription configuration settings. + + Returns: + error: An error description or None. """ if not self._token: - logger.warning("Transcription can't be started without a room token") - return - - logger.debug(f"Starting transcription: settings={settings}") + return "Transcription can't be started without a room token" future = self._get_event_loop().create_future() self._client.start_transcription( settings=self._params.transcription_settings.model_dump(exclude_none=True), completion=completion_callback(future), ) - error = await future - if error: - logger.error(f"Unable to start transcription: {error}") + return await future - async def stop_transcription(self): - """Stop transcription for the call.""" - if not self._token: - return + async def stop_transcription(self) -> Optional[CallClientError]: + """Stop transcription for the call. - logger.debug(f"Stopping transcription") + Returns: + error: An error description or None. + """ + if not self._token: + return "Transcription can't be stopped without a room token" future = self._get_event_loop().create_future() self._client.stop_transcription(completion=completion_callback(future)) - error = await future - if error: - logger.error(f"Unable to stop transcription: {error}") + return await future - async def send_prebuilt_chat_message(self, message: str, user_name: Optional[str] = None): + async def send_prebuilt_chat_message( + self, message: str, user_name: Optional[str] = None + ) -> Optional[CallClientError]: """Send a chat message to Daily's Prebuilt main room. Args: message: The chat message to send. user_name: Optional user name that will appear as sender of the message. + + Returns: + error: An error description or None. """ if not self._joined: - return + return "Can't send message if not joined" future = self._get_event_loop().create_future() self._client.send_prebuilt_chat_message( message, user_name=user_name, completion=completion_callback(future) ) - await future + return await future async def capture_participant_transcription(self, participant_id: str): """Enable transcription capture for a specific participant. @@ -1167,38 +1177,51 @@ async def add_custom_audio_track(self, track_name: str) -> DailyAudioTrack: return track - async def remove_custom_audio_track(self, track_name: str): + async def remove_custom_audio_track(self, track_name: str) -> Optional[CallClientError]: """Remove a custom audio track. Args: track_name: Name of the custom audio track to remove. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.remove_custom_audio_track( track_name=track_name, completion=completion_callback(future), ) - await future + return await future - async def update_transcription(self, participants=None, instance_id=None): + async def update_transcription( + self, participants=None, instance_id=None + ) -> Optional[CallClientError]: """Update transcription settings for specific participants. Args: participants: List of participant IDs to enable transcription for. instance_id: Optional transcription instance ID. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.update_transcription( participants, instance_id, completion=completion_callback(future) ) - await future + return await future - async def update_subscriptions(self, participant_settings=None, profile_settings=None): + async def update_subscriptions( + self, participant_settings=None, profile_settings=None + ) -> Optional[CallClientError]: """Update media subscription settings. Args: participant_settings: Per-participant subscription settings. profile_settings: Global subscription profile settings. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.update_subscriptions( @@ -1206,32 +1229,42 @@ async def update_subscriptions(self, participant_settings=None, profile_settings profile_settings=profile_settings, completion=completion_callback(future), ) - await future + return await future - async def update_publishing(self, publishing_settings: Mapping[str, Any]): + async def update_publishing( + self, publishing_settings: Mapping[str, Any] + ) -> Optional[CallClientError]: """Update media publishing settings. Args: publishing_settings: Publishing configuration settings. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.update_publishing( publishing_settings=publishing_settings, completion=completion_callback(future), ) - await future + return await future - async def update_remote_participants(self, remote_participants: Mapping[str, Any]): + async def update_remote_participants( + self, remote_participants: Mapping[str, Any] + ) -> Optional[CallClientError]: """Update settings for remote participants. Args: remote_participants: Remote participant configuration settings. + + Returns: + error: An error description or None. """ future = self._get_event_loop().create_future() self._client.update_remote_participants( remote_participants=remote_participants, completion=completion_callback(future) ) - await future + return await future # # @@ -1922,7 +1955,9 @@ async def send_message( Args: frame: The transport message frame to send. """ - await self._client.send_message(frame) + error = await self._client.send_message(frame) + if error: + logger.error(f"Unable to send message: {error}") async def register_video_destination(self, destination: str): """Register a video output destination. @@ -2166,7 +2201,7 @@ async def send_audio(self, frame: OutputAudioRawFrame): if self._output: await self._output.queue_frame(frame, FrameDirection.DOWNSTREAM) - def participants(self): + def participants(self) -> Mapping[str, Any]: """Get current participants in the room. Returns: @@ -2174,7 +2209,7 @@ def participants(self): """ return self._client.participants() - def participant_counts(self): + def participant_counts(self) -> Mapping[str, Any]: """Get participant count information. Returns: @@ -2182,76 +2217,155 @@ def participant_counts(self): """ return self._client.participant_counts() - async def start_dialout(self, settings=None): + async def start_dialout(self, settings=None) -> Tuple[str, Optional[CallClientError]]: """Start a dial-out call to a phone number. Args: settings: Dial-out configuration settings. + + Returns: + session_id: Dail-out session ID. + error: An error description or None. """ - await self._client.start_dialout(settings) + logger.debug(f"Starting dialout: settings={settings}") + + session_id, error = await self._client.start_dialout(settings) + if error: + logger.error(f"Unable to start dialout: {error}") + return session_id, error - async def stop_dialout(self, participant_id): + async def stop_dialout(self, participant_id) -> Optional[CallClientError]: """Stop a dial-out call for a specific participant. Args: participant_id: ID of the participant to stop dial-out for. + + Returns: + error: An error description or None. """ - await self._client.stop_dialout(participant_id) + logger.debug(f"Stopping dialout: participant_id={participant_id}") + + error = await self._client.stop_dialout(participant_id) + if error: + logger.error(f"Unable to stop dialout: {error}") + return error - async def sip_call_transfer(self, settings): + async def sip_call_transfer(self, settings) -> Optional[CallClientError]: """Transfer a SIP call to another destination. Args: settings: SIP call transfer settings. + + Returns: + error: An error description or None. """ - await self._client.sip_call_transfer(settings) + logger.debug(f"Staring SIP call transfer: settings={settings}") - async def sip_refer(self, settings): + error = await self._client.sip_call_transfer(settings) + if error: + logger.error(f"Unable to transfer SIP call: {error}") + return error + + async def sip_refer(self, settings) -> Optional[CallClientError]: """Send a SIP REFER request. Args: settings: SIP REFER settings. + + Returns: + error: An error description or None. """ - await self._client.sip_refer(settings) + logger.debug(f"Staring SIP REFER: settings={settings}") + + error = await self._client.sip_refer(settings) + if error: + logger.error(f"Unable to perform SIP REFER: {error}") + return error - async def start_recording(self, streaming_settings=None, stream_id=None, force_new=None): + async def start_recording( + self, streaming_settings=None, stream_id=None, force_new=None + ) -> Tuple[str, Optional[CallClientError]]: """Start recording the call. Args: streaming_settings: Recording configuration settings. stream_id: Unique identifier for the recording stream. force_new: Whether to force a new recording session. + + Returns: + stream_id: Unique identifier for the recording stream. + error: An error description or None. """ - await self._client.start_recording(streaming_settings, stream_id, force_new) + logger.debug( + f"Starting recording: stream_id={stream_id} force_new={force_new} settings={streaming_settings}" + ) - async def stop_recording(self, stream_id=None): + r_id, error = await self._client.start_recording(streaming_settings, stream_id, force_new) + if error: + logger.error(f"Unable to start recording: {error}") + return r_id, error + + async def stop_recording(self, stream_id=None) -> Optional[CallClientError]: """Stop recording the call. Args: stream_id: Unique identifier for the recording stream to stop. + + Returns: + error: An error description or None. """ - await self._client.stop_recording(stream_id) + logger.debug(f"Stopping recording: stream_id={stream_id}") - async def start_transcription(self, settings=None): + error = await self._client.stop_recording(stream_id) + if error: + logger.error(f"Unable to stop recording: {error}") + return error + + async def start_transcription(self, settings=None) -> Optional[CallClientError]: """Start transcription for the call. Args: settings: Transcription configuration settings. + + Returns: + error: An error description or None. """ - await self._client.start_transcription(settings) + logger.debug(f"Starting transcription: settings={settings}") - async def stop_transcription(self): - """Stop transcription for the call.""" - await self._client.stop_transcription() + error = await self._client.start_transcription(settings) + if error: + logger.error(f"Unable to start transcription: {error}") + return error + + async def stop_transcription(self) -> Optional[CallClientError]: + """Stop transcription for the call. + + Returns: + error: An error description or None. + """ + logger.debug(f"Stopping transcription") + + error = await self._client.stop_transcription() + if error: + logger.error(f"Unable to stop transcription: {error}") + return error - async def send_prebuilt_chat_message(self, message: str, user_name: Optional[str] = None): + async def send_prebuilt_chat_message( + self, message: str, user_name: Optional[str] = None + ) -> Optional[CallClientError]: """Send a chat message to Daily's Prebuilt main room. Args: message: The chat message to send. user_name: Optional user name that will appear as sender of the message. + + Returns: + error: An error description or None. """ - await self._client.send_prebuilt_chat_message(message, user_name) + error = await self._client.send_prebuilt_chat_message(message, user_name) + if error: + logger.error(f"Unable to send prebuilt chat message: {error}") + return error async def capture_participant_transcription(self, participant_id: str): """Enable transcription capture for a specific participant. @@ -2297,32 +2411,66 @@ async def capture_participant_video( participant_id, framerate, video_source, color_format ) - async def update_publishing(self, publishing_settings: Mapping[str, Any]): + async def update_publishing( + self, publishing_settings: Mapping[str, Any] + ) -> Optional[CallClientError]: """Update media publishing settings. Args: publishing_settings: Publishing configuration settings. + + Returns: + error: An error description or None. """ - await self._client.update_publishing(publishing_settings=publishing_settings) + logger.debug(f"Updating publishing settings: settings={publishing_settings}") - async def update_subscriptions(self, participant_settings=None, profile_settings=None): + error = await self._client.update_publishing(publishing_settings=publishing_settings) + if error: + logger.error(f"Unable to update publishing settings: {error}") + return error + + async def update_subscriptions( + self, participant_settings=None, profile_settings=None + ) -> Optional[CallClientError]: """Update media subscription settings. Args: participant_settings: Per-participant subscription settings. profile_settings: Global subscription profile settings. + + Returns: + error: An error description or None. """ - await self._client.update_subscriptions( + logger.debug( + f"Updating subscriptions: participant_settings={participant_settings} profile_settings={profile_settings}" + ) + + error = await self._client.update_subscriptions( participant_settings=participant_settings, profile_settings=profile_settings ) + if error: + logger.error(f"Unable to update subscription settings: {error}") + return error - async def update_remote_participants(self, remote_participants: Mapping[str, Any]): + async def update_remote_participants( + self, remote_participants: Mapping[str, Any] + ) -> Optional[CallClientError]: """Update settings for remote participants. Args: remote_participants: Remote participant configuration settings. + + Returns: + error: An error description or None. """ - await self._client.update_remote_participants(remote_participants=remote_participants) + logger.debug(f"Updating remote participants: remote_participants={remote_participants}") + + error = await self._client.update_remote_participants( + remote_participants=remote_participants + ) + if error: + logger.error(f"Unable to update remote participants: {error}") + return error async def _on_active_speaker_changed(self, participant: Any): """Handle active speaker change events.""" @@ -2330,6 +2478,12 @@ async def _on_active_speaker_changed(self, participant: Any): async def _on_joined(self, data): """Handle room joined events.""" + if self._params.transcription_enabled: + # We report an error because we are starting transcription + # internally and if it fails we need to know. + error = await self.start_transcription(self._params.transcription_settings) + if error: + await self._on_error(f"Unable to start transcription: {error}") await self._call_event_handler("on_joined", data) async def _on_left(self): @@ -2338,6 +2492,12 @@ async def _on_left(self): async def _on_before_leave(self): """Handle before leave room events.""" + if self._params.transcription_enabled: + # We report an error because we are stopping transcription + # internally and if it fails we need to know. + error = await self.stop_transcription() + if error: + await self._on_error(f"Unable to stop transcription: {error}") await self._call_event_handler("on_before_leave") async def _on_error(self, error):