diff --git a/tests/realtimestt_test_hotkeys_v2.py b/tests/realtimestt_test_hotkeys_v2.py new file mode 100644 index 0000000..2f051db --- /dev/null +++ b/tests/realtimestt_test_hotkeys_v2.py @@ -0,0 +1,451 @@ +EXTENDED_LOGGING = False + +if __name__ == '__main__': + + import subprocess + import sys + import threading + import time + + def install_rich(): + subprocess.check_call([sys.executable, "-m", "pip", "install", "rich"]) + + try: + import rich + except ImportError: + user_input = input("This demo needs the 'rich' library, which is not installed.\nDo you want to install it now? (y/n): ") + if user_input.lower() == 'y': + try: + install_rich() + import rich + print("Successfully installed 'rich'.") + except Exception as e: + print(f"An error occurred while installing 'rich': {e}") + sys.exit(1) + else: + print("The program requires the 'rich' library to run. Exiting...") + sys.exit(1) + + import keyboard + import pyperclip + + if EXTENDED_LOGGING: + import logging + logging.basicConfig(level=logging.DEBUG) + + from rich.console import Console + from rich.live import Live + from rich.text import Text + from rich.panel import Panel + console = Console() + console.print("System initializing, please wait") + + import os + from RealtimeSTT import AudioToTextRecorder # Ensure this module has stop() or close() methods + + import colorama + colorama.init() + + # Import pyautogui + import pyautogui + + import pyaudio + import numpy as np + + # Initialize Rich Console and Live + live = Live(console=console, refresh_per_second=10, screen=False) + live.start() + + # Global variables + full_sentences = [] + rich_text_stored = "" + recorder = None + displayed_text = "" # Used for tracking text that was already displayed + + end_of_sentence_detection_pause = 0.45 + unknown_sentence_detection_pause = 0.7 + mid_sentence_detection_pause = 2.0 + + prev_text = "" + + # Events to signal threads to exit or reset + exit_event = threading.Event() + reset_event = threading.Event() + + def preprocess_text(text): + # Remove leading whitespaces + text = text.lstrip() + + # Remove starting ellipses if present + if text.startswith("..."): + text = text[3:] + + # Remove any leading whitespaces again after ellipses removal + text = text.lstrip() + + # Uppercase the first letter + if text: + text = text[0].upper() + text[1:] + + return text + + def text_detected(text): + global prev_text, displayed_text, rich_text_stored + + text = preprocess_text(text) + + sentence_end_marks = ['.', '!', '?', '。'] + if text.endswith("..."): + recorder.post_speech_silence_duration = mid_sentence_detection_pause + elif text and text[-1] in sentence_end_marks and prev_text and prev_text[-1] in sentence_end_marks: + recorder.post_speech_silence_duration = end_of_sentence_detection_pause + else: + recorder.post_speech_silence_duration = unknown_sentence_detection_pause + + prev_text = text + + # Build Rich Text with alternating colors + rich_text = Text() + for i, sentence in enumerate(full_sentences): + if i % 2 == 0: + rich_text += Text(sentence, style="yellow") + Text(" ") + else: + rich_text += Text(sentence, style="cyan") + Text(" ") + + # If the current text is not a sentence-ending, display it in real-time + if text: + rich_text += Text(text, style="bold yellow") + + new_displayed_text = rich_text.plain + + if new_displayed_text != displayed_text: + displayed_text = new_displayed_text + panel = Panel(rich_text, title="[bold green]Live Transcription[/bold green]", border_style="bold green") + live.update(panel) + rich_text_stored = rich_text + + def process_text(text): + global recorder, full_sentences, prev_text, displayed_text + recorder.post_speech_silence_duration = unknown_sentence_detection_pause + text = preprocess_text(text) + text = text.rstrip() + if text.endswith("..."): + text = text[:-2] + + full_sentences.append(text) + prev_text = "" + text_detected("") + + # Check if reset_event is set + if reset_event.is_set(): + # Clear buffers + full_sentences.clear() + displayed_text = "" + reset_event.clear() + console.print("[bold magenta]Transcription buffer reset.[/bold magenta]") + return + + # Type the finalized sentence to the active window quickly if typing is enabled + try: + # Release modifier keys to prevent stuck keys + for key in ['ctrl', 'shift', 'alt', 'win']: + keyboard.release(key) + pyautogui.keyUp(key) + + # Use clipboard to paste text + pyperclip.copy(text + ' ') + pyautogui.hotkey('ctrl', 'v') + + except Exception as e: + console.print(f"[bold red]Failed to type the text: {e}[/bold red]") + + # Recorder configuration + recorder_config = { + 'spinner': False, + 'model': 'Systran/faster-distil-whisper-large-v3', # distil-medium.en or large-v2 or deepdml/faster-whisper-large-v3-turbo-ct2 or ... + 'input_device_index': 1, + 'realtime_model_type': 'Systran/faster-distil-whisper-large-v3', # Using the same model for realtime + 'language': 'en', + 'silero_sensitivity': 0.05, + 'webrtc_sensitivity': 3, + 'post_speech_silence_duration': unknown_sentence_detection_pause, + 'min_length_of_recording': 1.1, + 'min_gap_between_recordings': 0, + 'enable_realtime_transcription': True, + 'realtime_processing_pause': 0.02, + 'on_realtime_transcription_update': text_detected, + # 'on_realtime_transcription_stabilized': text_detected, + 'silero_deactivity_detection': True, + 'early_transcription_on_silence': 0, + 'beam_size': 5, + 'beam_size_realtime': 5, # Matching beam_size for consistency + 'no_log_file': True, + 'initial_prompt': "Use ellipses for incomplete sentences like: I went to the...", + 'device': 'cuda', # Added device configuration + 'compute_type': 'float16' # Added compute_type configuration + } + + if EXTENDED_LOGGING: + recorder_config['level'] = logging.DEBUG + + recorder = AudioToTextRecorder(**recorder_config) + + initial_text = Panel(Text("Say something...", style="cyan bold"), title="[bold yellow]Waiting for Input[/bold yellow]", border_style="bold yellow") + live.update(initial_text) + + # Print available hotkeys + console.print("[bold green]Available Hotkeys:[/bold green]") + console.print("[bold cyan]F1[/bold cyan]: Mute Microphone") + console.print("[bold cyan]F2[/bold cyan]: Unmute Microphone") + console.print("[bold cyan]F3[/bold cyan]: Start Static Recording") + console.print("[bold cyan]F4[/bold cyan]: Stop Static Recording") + console.print("[bold cyan]F5[/bold cyan]: Reset Transcription") + + # Global variables for static recording + static_recording_active = False + static_recording_thread = None + static_audio_frames = [] + live_recording_enabled = True # Track whether live recording was enabled before static recording + + # Audio settings for static recording + audio_settings = { + 'FORMAT': pyaudio.paInt16, # PyAudio format + 'CHANNELS': 1, # Mono audio + 'RATE': 16000, # Sample rate + 'CHUNK': 1024 # Buffer size + } + + # Note: The maximum recommended length of static recording is about 5 minutes. + + def static_recording_worker(): + """ + Worker function to record audio statically. + """ + global static_audio_frames, static_recording_active + # Set up pyaudio + p = pyaudio.PyAudio() + # Use the same audio format as defined in audio_settings + FORMAT = audio_settings['FORMAT'] + CHANNELS = audio_settings['CHANNELS'] + RATE = audio_settings['RATE'] # Sample rate + CHUNK = audio_settings['CHUNK'] # Buffer size + + # Open the audio stream + try: + stream = p.open(format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK) + except Exception as e: + console.print(f"[bold red]Failed to open audio stream for static recording: {e}[/bold red]") + static_recording_active = False + p.terminate() + return + + while static_recording_active and not exit_event.is_set(): + try: + data = stream.read(CHUNK) + static_audio_frames.append(data) + except Exception as e: + console.print(f"[bold red]Error during static recording: {e}[/bold red]") + break + + # Stop and close the stream + stream.stop_stream() + stream.close() + p.terminate() + + def start_static_recording(): + """ + Starts the static audio recording. + """ + global static_recording_active, static_recording_thread, static_audio_frames, live_recording_enabled + if static_recording_active: + console.print("[bold yellow]Static recording is already in progress.[/bold yellow]") + return + + # Mute the live recording microphone + live_recording_enabled = recorder.use_microphone.value + if live_recording_enabled: + recorder.set_microphone(False) + console.print("[bold yellow]Live microphone muted during static recording.[/bold yellow]") + + console.print("[bold green]Starting static recording... Press F4 or F5 to stop/reset.[/bold green]") + static_audio_frames = [] + static_recording_active = True + static_recording_thread = threading.Thread(target=static_recording_worker, daemon=True) + static_recording_thread.start() + + def stop_static_recording(): + """ + Stops the static audio recording and processes the transcription. + """ + global static_recording_active, static_recording_thread + if not static_recording_active: + console.print("[bold yellow]No static recording is in progress.[/bold yellow]") + return + + console.print("[bold green]Stopping static recording...[/bold green]") + static_recording_active = False + if static_recording_thread is not None: + static_recording_thread.join() + static_recording_thread = None + + # Start a new thread to process the transcription + processing_thread = threading.Thread(target=process_static_transcription, daemon=True) + processing_thread.start() + + def process_static_transcription(): + global static_audio_frames, live_recording_enabled + if exit_event.is_set(): + return + # Process the recorded audio + console.print("[bold green]Processing static recording...[/bold green]") + + # Convert audio data to numpy array + audio_data = b''.join(static_audio_frames) + audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 + + # Transcribe the audio data + try: + from faster_whisper import WhisperModel + except ImportError: + console.print("[bold red]faster_whisper is not installed. Please install it to use static transcription.[/bold red]") + return + + # Load the model using recorder_config + model_size = recorder_config['model'] + device = recorder_config['device'] + compute_type = recorder_config['compute_type'] + + console.print("Loading transcription model... This may take a moment.") + try: + model = WhisperModel(model_size, device=device, compute_type=compute_type) + except Exception as e: + console.print(f"[bold red]Failed to load transcription model: {e}[/bold red]") + return + + # Transcribe the audio + try: + segments, info = model.transcribe(audio_array, beam_size=recorder_config['beam_size']) + transcription = ' '.join([segment.text for segment in segments]).strip() + except Exception as e: + console.print(f"[bold red]Error during transcription: {e}[/bold red]") + return + + # Display the transcription + console.print("Static Recording Transcription:") + console.print(f"[bold cyan]{transcription}[/bold cyan]") + + # Type the transcription into the active window + try: + # Release modifier keys to prevent stuck keys + for key in ['ctrl', 'shift', 'alt', 'win']: + keyboard.release(key) + pyautogui.keyUp(key) + + # Use clipboard to paste text + pyperclip.copy(transcription + ' ') + pyautogui.hotkey('ctrl', 'v') + + except Exception as e: + console.print(f"[bold red]Failed to type the static transcription: {e}[/bold red]") + + # Unmute the live recording microphone if it was enabled before + if live_recording_enabled and not exit_event.is_set(): + recorder.set_microphone(True) + console.print("[bold yellow]Live microphone unmuted.[/bold yellow]") + + def reset_transcription(): + """ + Resets the transcription by flushing ongoing recordings or buffers. + """ + global static_recording_active, static_recording_thread, static_audio_frames + console.print("[bold magenta]Resetting transcription...[/bold magenta]") + if static_recording_active: + console.print("[bold magenta]Flushing static recording...[/bold magenta]") + # Stop static recording + static_recording_active = False + if static_recording_thread is not None: + static_recording_thread.join() + static_recording_thread = None + # Clear static audio frames + static_audio_frames = [] + # Unmute microphone if it was muted during static recording + if live_recording_enabled: + recorder.set_microphone(True) + console.print("[bold yellow]Live microphone unmuted after reset.[/bold yellow]") + elif recorder.use_microphone.value: + # Live transcription is active and microphone is not muted + console.print("[bold magenta]Resetting live transcription buffer...[/bold magenta]") + reset_event.set() + else: + # Microphone is muted; nothing to reset + console.print("[bold yellow]Microphone is muted. Nothing to reset.[/bold yellow]") + + # Hotkey Callback Functions + + def mute_microphone(): + recorder.set_microphone(False) + console.print("[bold red]Microphone muted.[/bold red]") + + def unmute_microphone(): + recorder.set_microphone(True) + console.print("[bold green]Microphone unmuted.[/bold green]") + + # Start the transcription loop in a separate thread + def transcription_loop(): + try: + while not exit_event.is_set(): + recorder.text(process_text) + except Exception as e: + console.print(f"[bold red]Error in transcription loop: {e}[/bold red]") + finally: + # Do not call sys.exit() here + pass + + # Start the transcription loop thread + transcription_thread = threading.Thread(target=transcription_loop, daemon=True) + transcription_thread.start() + + # Define the hotkey combinations and their corresponding functions + keyboard.add_hotkey('F1', mute_microphone, suppress=True) + keyboard.add_hotkey('F2', unmute_microphone, suppress=True) + keyboard.add_hotkey('F3', start_static_recording, suppress=True) + keyboard.add_hotkey('F4', stop_static_recording, suppress=True) + keyboard.add_hotkey('F5', reset_transcription, suppress=True) + + # Keep the main thread running and handle graceful exit + try: + keyboard.wait() # Waits indefinitely, until a hotkey triggers an exit or Ctrl+C + except KeyboardInterrupt: + console.print("[bold yellow]KeyboardInterrupt received. Exiting...[/bold yellow]") + finally: + # Signal threads to exit + exit_event.set() + + # Reset transcription if needed + reset_transcription() + + # Stop the recorder + try: + if hasattr(recorder, 'stop'): + recorder.stop() + elif hasattr(recorder, 'close'): + recorder.close() + except Exception as e: + console.print(f"[bold red]Error stopping recorder: {e}[/bold red]") + + # Allow some time for threads to finish + time.sleep(1) + + # Wait for transcription_thread to finish + if transcription_thread.is_alive(): + transcription_thread.join(timeout=5) + + # Stop the Live console + live.stop() + + console.print("[bold red]Exiting gracefully...[/bold red]") + sys.exit(0)