Skip to content

Commit

Permalink
Merge pull request #3 from speechmatics/v0.0.3
Browse files Browse the repository at this point in the history
Fix latency and improved audio playback on systems running Python 3.12+
  • Loading branch information
dumitrugutu authored Oct 23, 2024
2 parents 23bf49e + 08be7bc commit 84076ef
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.0.3] - 2024-10-23

### Changed

- PyAudio class is instantiated only when the client is started directly from the CLI.
- Simplified microphone example

### Fixed

- Choppy audio playback on some systems using Python 3.12+
- Latency issues on some systems using Python 3.12+

## [0.0.2] - 2024-10-17

### Added
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.2
0.0.3
47 changes: 25 additions & 22 deletions examples/stream_from_microphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

AUTH_TOKEN = "YOUR TOKEN HERE"


# Create a websocket client
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
Expand All @@ -29,57 +28,61 @@
)
)

# Create a buffer to store binary messages sent from the server
audio_buffer = io.BytesIO()

# Create an asyncio queue to store audio data
audio_queue = asyncio.Queue()


# Create callback function which adds binary messages to audio buffer
# Create a callback function to add binary messages to the audio queue
def binary_msg_handler(msg: bytes):
if isinstance(msg, (bytes, bytearray)):
audio_buffer.write(msg)
audio_queue.put_nowait(msg)


# Register the callback to be called when the client receives an audio message from the server
# Register the callback to be called when the client receives an audio message
client.add_event_handler(ServerMessageType.audio, binary_msg_handler)


async def audio_playback():
"""Read from buffer and play audio back to the user"""
"""Continuously read from the audio queue and play audio back to the user."""
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
chunk_size = 1024
player_stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)

try:
while True:
# Get the current value from the buffer
audio_to_play = audio_buffer.getvalue()
# Only proceed if there is audio data to play
if audio_to_play:
# Write the audio to the stream
stream.write(audio_to_play)
audio_buffer.seek(0)
audio_buffer.truncate(0)
# Pause briefly before checking the buffer again
await asyncio.sleep(0.05)
# Create a new playback buffer for each iteration
playback_buffer = io.BytesIO()

# Fill the buffer until it has enough data
while playback_buffer.tell() < chunk_size:
playback_buffer.write(await audio_queue.get())

# Write the full buffer contents to the player stream
player_stream.write(playback_buffer.getvalue())
finally:
stream.close()
stream.stop_stream()
player_stream.stop_stream()
player_stream.close()
p.terminate()


async def main():
"""Main function to run both the WebSocket client and audio playback."""
tasks = [
# Use the websocket to connect to Flow Service and start a conversation
# Start the WebSocket client and conversation
asyncio.create_task(
client.run(
interactions=[Interaction(sys.stdin.buffer)],
audio_settings=AudioSettings(),
conversation_config=ConversationConfig(),
)
),
# Run audio playback handler which streams audio from audio buffer
# Start the audio playback handler
asyncio.create_task(audio_playback()),
]

await asyncio.gather(*tasks)


# Run the main event loop
asyncio.run(main())
22 changes: 13 additions & 9 deletions speechmatics_flow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def __init__(
self.conversation_ended_wait_timeout = 5
self._session_needs_closing = False
self._audio_buffer = None
self._pyaudio = pyaudio.PyAudio

# The following asyncio fields are fully instantiated in
# _init_synchronization_primitives
Expand All @@ -86,7 +85,6 @@ async def _init_synchronization_primitives(self):
"""
self._conversation_started = asyncio.Event()
self._conversation_ended = asyncio.Event()
self._pyaudio = pyaudio.PyAudio()
self._buffer_semaphore = asyncio.BoundedSemaphore(
self.connection_settings.message_buffer_size
)
Expand Down Expand Up @@ -165,7 +163,6 @@ async def _consumer(self, message, from_cli: False):
:raises ForceEndSession: If this was raised by the user's event
handler.
"""
LOGGER.debug(message)
if isinstance(message, (bytes, bytearray)):
# add an audio message to local buffer only when running from cli
if from_cli:
Expand All @@ -174,6 +171,7 @@ async def _consumer(self, message, from_cli: False):
# so we need to set it here for event_handler to work
message_type = ServerMessageType.audio
else:
LOGGER.debug(message)
message = json.loads(message)
message_type = message.get("message")

Expand All @@ -200,14 +198,15 @@ async def _consumer(self, message, from_cli: False):
raise ConversationError(message["reason"])

async def _read_from_microphone(self):
_pyaudio = pyaudio.PyAudio()
print(
f"Default input device: {self._pyaudio.get_default_input_device_info()['name']}"
f"Default input device: {_pyaudio.get_default_input_device_info()['name']}"
)
print(
f"Default output device: {self._pyaudio.get_default_output_device_info()['name']}"
f"Default output device: {_pyaudio.get_default_output_device_info()['name']}"
)
print("Start speaking...")
stream = self._pyaudio.open(
stream = _pyaudio.open(
format=pyaudio.paInt16,
channels=1,
rate=self.audio_settings.sample_rate,
Expand All @@ -229,13 +228,15 @@ async def _read_from_microphone(self):
self.seq_no += 1
self._call_middleware(ClientMessageType.AddAudio, audio_chunk, True)
await self.websocket.send(audio_chunk)
# send audio at a constant rate
await asyncio.sleep(0.01)
except KeyboardInterrupt:
await self.websocket.send(self._end_of_audio())
finally:
await self._wait_for_conversation_ended()
stream.stop_stream()
stream.close()
self._pyaudio.terminate()
_pyaudio.terminate()

async def _consumer_handler(self, from_cli: False):
"""
Expand Down Expand Up @@ -295,7 +296,8 @@ async def _playback_handler(self):
"""
Reads audio binary messages from the playback buffer and plays them to the user.
"""
stream = self._pyaudio.open(
_pyaudio = pyaudio.PyAudio()
stream = _pyaudio.open(
format=pyaudio.paInt16,
channels=1,
rate=self.audio_settings.sample_rate,
Expand All @@ -309,13 +311,15 @@ async def _playback_handler(self):
audio_message = await self._audio_buffer.get()
stream.write(audio_message)
self._audio_buffer.task_done()
# read from buffer at a constant rate
await asyncio.sleep(0.005)
except Exception as e:
LOGGER.error(f"Error during audio playback: {e}")
raise e
finally:
stream.close()
stream.stop_stream()
self._pyaudio.terminate()
_pyaudio.terminate()

def _call_middleware(self, event_name, *args):
"""
Expand Down

0 comments on commit 84076ef

Please sign in to comment.