diff --git a/examples/realtime/audio_util.py b/examples/realtime/audio_util.py index 954a508675..d8840bc411 100644 --- a/examples/realtime/audio_util.py +++ b/examples/realtime/audio_util.py @@ -1,18 +1,13 @@ from __future__ import annotations import io -import base64 -import asyncio import threading -from typing import Callable, Awaitable import numpy as np import pyaudio import sounddevice as sd from pydub import AudioSegment -from openai.resources.realtime.realtime import AsyncRealtimeConnection - CHUNK_LENGTH_S = 0.05 # 100ms SAMPLE_RATE = 24000 FORMAT = pyaudio.paInt16 @@ -89,54 +84,4 @@ def stop(self): self.queue = [] def terminate(self): - self.stream.close() - - -async def send_audio_worker_sounddevice( - connection: AsyncRealtimeConnection, - should_send: Callable[[], bool] | None = None, - start_send: Callable[[], Awaitable[None]] | None = None, -): - sent_audio = False - - device_info = sd.query_devices() - print(device_info) - - read_size = int(SAMPLE_RATE * 0.02) - - stream = sd.InputStream( - channels=CHANNELS, - samplerate=SAMPLE_RATE, - dtype="int16", - ) - stream.start() - - try: - while True: - if stream.read_available < read_size: - await asyncio.sleep(0) - continue - - data, _ = stream.read(read_size) - - if should_send() if should_send else True: - if not sent_audio and start_send: - await start_send() - await connection.send( - {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")} - ) - sent_audio = True - - elif sent_audio: - print("Done, triggering inference") - await connection.send({"type": "input_audio_buffer.commit"}) - await connection.send({"type": "response.create", "response": {}}) - sent_audio = False - - await asyncio.sleep(0) - - except KeyboardInterrupt: - pass - finally: - stream.stop() - stream.close() + self.stream.close() \ No newline at end of file diff --git a/examples/realtime/azure_realtime.py b/examples/realtime/azure_realtime.py old mode 100644 new mode 100755 index 3cf64b8be9..131c8a35ae --- a/examples/realtime/azure_realtime.py +++ b/examples/realtime/azure_realtime.py @@ -1,4 +1,31 @@ +#!/usr/bin/env uv run +# +# /// script +# requires-python = ">=3.9" +# dependencies = [ +# "textual", +# "numpy", +# "pyaudio", +# "pydub", +# "sounddevice", +# "openai[realtime]", +# "azure-identity", +# "aiohttp", +# "python-dotenv", +# ] +# +# [tool.uv.sources] +# openai = { path = "../../", editable = true } +# /// + +import logging +from dotenv import load_dotenv +import httpx + +load_dotenv() + import os +import base64 import asyncio from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider @@ -11,6 +38,14 @@ # Supported models and API versions: https://learn.microsoft.com/azure/ai-services/openai/how-to/realtime-audio#supported-models # Entra ID auth: https://learn.microsoft.com/azure/ai-services/openai/how-to/managed-identity +logging.getLogger().setLevel(logging.DEBUG) +logging.getLogger("websockets").setLevel(logging.DEBUG) + +logging.basicConfig( + format="%(asctime)s %(message)s", + level=logging.DEBUG, +) + async def main() -> None: """The following example demonstrates how to configure Azure OpenAI to use the Realtime API. @@ -21,21 +56,40 @@ async def main() -> None: """ credential = DefaultAzureCredential() + + if not (api_key := os.environ.get("AZURE_OPENAI_API_KEY")): + token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default") + else: + token_provider = None + + endpoint = httpx.URL(os.environ["AZURE_OPENAI_ENDPOINT"]) + if endpoint.scheme in ("ws", "wss"): + websocket_base_url, azure_endpoint = f"{endpoint}/openai", None + else: + websocket_base_url, azure_endpoint = None, endpoint + + print(f"{websocket_base_url=}, {azure_endpoint=}") + client = AsyncAzureOpenAI( - azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"], - azure_ad_token_provider=get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default"), - api_version="2024-10-01-preview", - ) - async with client.realtime.connect( + azure_deployment="gpt-realtime", + azure_endpoint=str(azure_endpoint), + websocket_base_url=websocket_base_url, + azure_ad_token_provider=token_provider, + api_key=api_key, + api_version="2025-04-01-preview" + ) # type: ignore + + async with client.beta.realtime.connect( model="gpt-realtime", # deployment name for your model ) as connection: await connection.session.update( session={ - "output_modalities": ["text"], - "model": "gpt-realtime", - "type": "realtime", + # "output_modalities": ["text"], + # "model": "gpt-realtime", + # "type": "realtime", } ) + while True: user_input = input("Enter a message: ") if user_input == "q": @@ -48,14 +102,28 @@ async def main() -> None: "content": [{"type": "input_text", "text": user_input}], } ) + await connection.response.create() async for event in connection: - if event.type == "response.output_text.delta": + print(f"Event: {event.type}") + + if event.type == "error": + print(f"ERROR: {event}") + + if event.type == "response.text.delta": print(event.delta, flush=True, end="") - elif event.type == "response.output_text.done": + if event.type == "response.text.done": print() - elif event.type == "response.done": - break + if event.type == "response.done": + print(f"final response: {event.response.output[0].content[0].transcript}") + print(f"usage: {event.response.usage}") + + if event.type == "response.audio.delta": + audio_data = base64.b64decode(event.delta) + print(f"Received {len(audio_data)} bytes of audio data.") + + if event.type == "response.audio_transcript.delta": + print(f"Received text delta: {event.delta}") await credential.close() diff --git a/examples/realtime/push_to_talk_app.py b/examples/realtime/push_to_talk_app.py index acf38995b2..c2837a6646 100755 --- a/examples/realtime/push_to_talk_app.py +++ b/examples/realtime/push_to_talk_app.py @@ -18,6 +18,9 @@ # "pydub", # "sounddevice", # "openai[realtime]", +# "azure-identity", +# "aiohttp", +# "python-dotenv", # ] # # [tool.uv.sources] @@ -25,6 +28,12 @@ # /// from __future__ import annotations +from dotenv import load_dotenv +import httpx + +load_dotenv() + +import os import base64 import asyncio from typing import Any, cast @@ -33,13 +42,14 @@ from textual import events from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync from textual.app import App, ComposeResult -from textual.widgets import Button, Static, RichLog +from textual.widgets import Static, RichLog from textual.reactive import reactive +from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider from textual.containers import Container -from openai import AsyncOpenAI -from openai.types.realtime.session import Session +from openai import AsyncAzureOpenAI from openai.resources.realtime.realtime import AsyncRealtimeConnection +from openai.types.realtime.session_update_event import Session class SessionDisplay(Static): @@ -123,7 +133,7 @@ class RealtimeApp(App[None]): } """ - client: AsyncOpenAI + client: AsyncAzureOpenAI should_send_audio: asyncio.Event audio_player: AudioPlayerAsync last_audio_item_id: str | None @@ -135,7 +145,30 @@ def __init__(self) -> None: super().__init__() self.connection = None self.session = None - self.client = AsyncOpenAI() + + if not (api_key := os.environ.get("AZURE_OPENAI_API_KEY")): + credential = DefaultAzureCredential() + token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default") + else: + token_provider = None + + endpoint = httpx.URL(os.environ["AZURE_OPENAI_ENDPOINT"]) + if endpoint.scheme in ("ws", "wss"): + websocket_base_url, azure_endpoint = f"{endpoint}/openai", None + else: + websocket_base_url, azure_endpoint = None, endpoint + + print(f"{websocket_base_url=}, {azure_endpoint=}") + + self.client = AsyncAzureOpenAI( + azure_deployment="gpt-realtime", + azure_endpoint=str(azure_endpoint), + websocket_base_url=websocket_base_url, + azure_ad_token_provider=token_provider, + api_key=api_key, + api_version="2025-04-01-preview" + ) # type: ignore + self.audio_player = AudioPlayerAsync() self.last_audio_item_id = None self.should_send_audio = asyncio.Event() @@ -154,21 +187,21 @@ async def on_mount(self) -> None: self.run_worker(self.send_mic_audio()) async def handle_realtime_connection(self) -> None: - async with self.client.realtime.connect(model="gpt-realtime") as conn: + async with self.client.beta.realtime.connect(model="gpt-realtime") as conn: self.connection = conn self.connected.set() # note: this is the default and can be omitted # if you want to manually handle VAD yourself, then set `'turn_detection': None` - await conn.session.update( - session={ - "audio": { - "input": {"turn_detection": {"type": "server_vad"}}, - }, - "model": "gpt-realtime", - "type": "realtime", - } - ) + # await conn.session.update( + # session={ + # "audio": { + # "input": {"turn_detection": {"type": "server_vad"}}, + # }, + # "model": "gpt-realtime", + # "type": "realtime", + # } + # ) acc_items: dict[str, Any] = {} @@ -184,7 +217,7 @@ async def handle_realtime_connection(self) -> None: self.session = event.session continue - if event.type == "response.output_audio.delta": + if event.type == "response.audio.delta": if event.item_id != self.last_audio_item_id: self.audio_player.reset_frame_count() self.last_audio_item_id = event.item_id @@ -193,7 +226,7 @@ async def handle_realtime_connection(self) -> None: self.audio_player.add_data(bytes_data) continue - if event.type == "response.output_audio_transcript.delta": + if event.type == "response.audio_transcript.delta": try: text = acc_items[event.item_id] except KeyError: @@ -258,10 +291,6 @@ async def send_mic_audio(self) -> None: async def on_key(self, event: events.Key) -> None: """Handle key press events.""" - if event.key == "enter": - self.query_one(Button).press() - return - if event.key == "q": self.exit() return