Skip to content

Commit

Permalink
fix(client): manage stop events and handle errors correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
janaab11 committed Jan 4, 2025
1 parent f2c3144 commit b6d6bc6
Showing 1 changed file with 77 additions and 38 deletions.
115 changes: 77 additions & 38 deletions src/diart/console/client.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,60 @@
import argparse
from pathlib import Path
from threading import Thread
from threading import Event, Thread
from typing import Optional, Text

import rx.operators as ops
from websocket import WebSocket
from websocket import WebSocket, WebSocketException

from diart import argdoc
from diart import sources as src
from diart import utils


def send_audio(ws: WebSocket, source: Text, step: float, sample_rate: int):
# Create audio source
source_components = source.split(":")
if source_components[0] != "microphone":
audio_source = src.FileAudioSource(source, sample_rate, block_duration=step)
else:
device = int(source_components[1]) if len(source_components) > 1 else None
audio_source = src.MicrophoneAudioSource(step, device)
def send_audio(
ws: WebSocket, source: Text, step: float, sample_rate: int, stop_event: Event
):
try:
# Create audio source
source_components = source.split(":")
if source_components[0] != "microphone":
audio_source = src.FileAudioSource(source, sample_rate, block_duration=step)
else:
device = int(source_components[1]) if len(source_components) > 1 else None
audio_source = src.MicrophoneAudioSource(step, device)

# Encode audio, then send through websocket
audio_source.stream.pipe(ops.map(utils.encode_audio)).subscribe_(ws.send)
# Encode audio, then send through websocket
def on_next(data):
if not stop_event.is_set():
try:
ws.send(utils.encode_audio(data))
except WebSocketException:
stop_event.set()

# Start reading audio
audio_source.read()
audio_source.stream.subscribe_(on_next)

# Start reading audio
audio_source.read()
except Exception as e:
print(f"Error in send_audio: {e}")
stop_event.set()

def receive_audio(ws: WebSocket, output: Optional[Path]):
while True:
message = ws.recv()
print(f"Received: {message}", end="")
if output is not None:
with open(output, "a") as file:
file.write(message)

def receive_audio(ws: WebSocket, output: Optional[Path], stop_event: Event):
try:
while not stop_event.is_set():
try:
message = ws.recv()
print(f"Received: {message}", end="")
if output is not None:
with open(output, "a") as file:
file.write(message)
except WebSocketException:
break
except Exception as e:
print(f"Error in receive_audio: {e}")
finally:
stop_event.set()


def run():
Expand Down Expand Up @@ -65,23 +86,41 @@ def run():

# Run websocket client
ws = WebSocket()
ws.connect(f"ws://{args.host}:{args.port}")

# Wait for READY signal from server
print("Waiting for server to be ready...", end="", flush=True)
while True:
message = ws.recv()
if message.strip() == "READY":
print(" OK")
break
print(f"\nUnexpected message while waiting for READY: {message}")

sender = Thread(
target=send_audio, args=[ws, args.source, args.step, args.sample_rate]
)
receiver = Thread(target=receive_audio, args=[ws, args.output_file])
sender.start()
receiver.start()
stop_event = Event()

try:
ws.connect(f"ws://{args.host}:{args.port}")

# Wait for READY signal from server
print("Waiting for server to be ready...", end="", flush=True)
while True:
try:
message = ws.recv()
if message.strip() == "READY":
print(" OK")
break
print(f"\nUnexpected message while waiting for READY: {message}")
except WebSocketException as e:
print(f"\nError while waiting for server: {e}")
return

sender = Thread(
target=send_audio,
args=[ws, args.source, args.step, args.sample_rate, stop_event],
)
receiver = Thread(target=receive_audio, args=[ws, args.output_file, stop_event])

sender.start()
receiver.start()

except Exception as e:
print(f"Error: {e}")
stop_event.set()
finally:
try:
ws.close()
except:
pass


if __name__ == "__main__":
Expand Down

0 comments on commit b6d6bc6

Please sign in to comment.