From 5daf28ee4509895167072f61145bad0cf645b678 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Thu, 21 Aug 2025 22:01:22 -0700 Subject: [PATCH 1/8] working without echo cancellation --- agents/src/cli.ts | 52 +++ agents/src/voice/agent_session.ts | 66 ++-- agents/src/voice/chat_cli.ts | 484 +++++++++++++++++++++++ agents/src/voice/io.ts | 10 + agents/src/voice/native_audio.ts | 619 ++++++++++++++++++++++++++++++ examples/src/basic_agent.ts | 6 +- 6 files changed, 1214 insertions(+), 23 deletions(-) create mode 100644 agents/src/voice/chat_cli.ts create mode 100644 agents/src/voice/native_audio.ts diff --git a/agents/src/cli.ts b/agents/src/cli.ts index ec2452779..47317d5a4 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -220,5 +220,57 @@ export const runApp = (opts: WorkerOptions) => { }); }); + program + .command('console') + .description('Start a new conversation inside the console') + .action(async () => { + const options = program.optsWithGlobals(); + + opts.wsURL = opts.wsURL || 'ws://localhost:7881/fake_console_url'; + opts.apiKey = opts.apiKey || 'fake_console_key'; + opts.apiSecret = opts.apiSecret || 'fake_console_secret'; + + initializeLogger({ pretty: true, level: options.logLevel }); + const logger = log(); + + try { + const mod = await import(new URL(opts.agent, import.meta.url).href).catch(async () => { + const { pathToFileURL } = await import('node:url'); + const url = pathToFileURL(opts.agent); + return import(url.href); + }); + + const agentDef = (mod && mod.default) as { + entry: (ctx: unknown) => Promise; + prewarm?: (proc: unknown) => Promise | void; + }; + if (!agentDef || typeof agentDef.entry !== 'function') { + logger.fatal('default export is not an agent with an entry() function'); + process.exit(1); + } + + process.once('SIGINT', () => process.exit(130)); + process.once('SIGTERM', () => process.exit(143)); + + const mockCtx = { + room: undefined, + connect: async () => {}, + proc: { + userData: {}, + inferenceExecutor: null, // Add missing inference executor + }, + }; + + if (agentDef.prewarm) { + await agentDef.prewarm(mockCtx.proc as any); + } + + await agentDef.entry(mockCtx as any); + } catch (error) { + logger.fatal(error); + process.exit(1); + } + }); + program.parse(); }; diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index a88334f74..f87d891d4 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -169,7 +169,7 @@ export class AgentSession< outputOptions, }: { agent: Agent; - room: Room; + room?: Room; inputOptions?: Partial; outputOptions?: Partial; }): Promise { @@ -180,30 +180,50 @@ export class AgentSession< this.agent = agent; this._updateAgentState('initializing'); - // Check for existing input/output configuration and warn if needed - if (this.input.audio && inputOptions?.audioEnabled !== false) { - this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..'); - } + // Check if this is console mode (no room provided) + if (!room) { + // Console mode - check if IO is already configured + if ( + this.input.audio !== null || + this.output.audio !== null || + this.output.transcription !== null + ) { + this.logger.warn( + 'Console mode detected but input.audio or output.audio or output.transcription is already set' + ); + } else { + // Auto-create ChatCLI for console mode + const { ChatCLI } = await import('./chat_cli.js'); + const chatCli = new ChatCLI(this); + await chatCli.start(); + } + } else { + // Room mode + // Check for existing input/output configuration and warn if needed + if (this.input.audio && inputOptions?.audioEnabled !== false) { + this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..'); + } - if (this.output.audio && outputOptions?.audioEnabled !== false) { - this.logger.warn( - 'RoomIO audio output is enabled but output.audio is already set, ignoring..', - ); - } + if (this.output.audio && outputOptions?.audioEnabled !== false) { + this.logger.warn( + 'RoomIO audio output is enabled but output.audio is already set, ignoring..', + ); + } - if (this.output.transcription && outputOptions?.transcriptionEnabled !== false) { - this.logger.warn( - 'RoomIO transcription output is enabled but output.transcription is already set, ignoring..', - ); - } + if (this.output.transcription && outputOptions?.transcriptionEnabled !== false) { + this.logger.warn( + 'RoomIO transcription output is enabled but output.transcription is already set, ignoring..', + ); + } - this.roomIO = new RoomIO({ - agentSession: this, - room, - inputOptions, - outputOptions, - }); - this.roomIO.start(); + this.roomIO = new RoomIO({ + agentSession: this, + room, + inputOptions, + outputOptions, + }); + this.roomIO.start(); + } this.updateActivity(this.agent); @@ -221,6 +241,8 @@ export class AgentSession< this._updateAgentState('listening'); } + + updateAgent(agent: Agent): void { this.agent = agent; diff --git a/agents/src/voice/chat_cli.ts b/agents/src/voice/chat_cli.ts new file mode 100644 index 000000000..076d964b1 --- /dev/null +++ b/agents/src/voice/chat_cli.ts @@ -0,0 +1,484 @@ +import { AudioFrame } from '@livekit/rtc-node'; +import { EventEmitter } from 'node:events'; +import { createRequire } from 'node:module'; +import process from 'node:process'; +import readline from 'node:readline'; +import { setInterval as setIntervalSafe, clearInterval as clearIntervalSafe } from 'node:timers'; +import { log } from '../log.js'; +import { AsyncIterableQueue } from '../utils.js'; +import type { Agent } from './agent.js'; +import type { AgentSession } from './agent_session.js'; +import { AudioInput, AudioOutput, TextOutput, type PlaybackFinishedEvent } from './io.js'; +import { TranscriptionSynchronizer } from './transcription/synchronizer.js'; +import { ReadableStream } from 'node:stream/web'; + +const require = createRequire(import.meta.url); + +const MAX_AUDIO_BAR = 30; +const INPUT_DB_MIN = -70.0; +const INPUT_DB_MAX = 0.0; +const FPS = 16; +const MIN_RMS = 2.0; + +function esc(...codes: number[]): string { + return `\u001b[${codes.join(';')}m`; +} + +function clampNormalizeDb(amplitudeDb: number, dbMin: number, dbMax: number): number { + amplitudeDb = Math.max(dbMin, Math.min(amplitudeDb, dbMax)); + return (amplitudeDb - dbMin) / (dbMax - dbMin); +} + +class ConsoleAudioInput extends AudioInput { + private sampleRate: number; + private numChannels: number; + private framesPerBuffer: number; + private deviceId: number | undefined; + private ai: any | null = null; + private started = false; + private queue = new AsyncIterableQueue(); + private sourceSet = false; + private logger = log(); + private mockInterval: NodeJS.Timeout | null = null; + + microDb: number = INPUT_DB_MIN; + receivedAudio: boolean = false; + + constructor({ sampleRate = 24000, numChannels = 1, framesPerBuffer = 240, deviceId }: { sampleRate?: number; numChannels?: number; framesPerBuffer?: number; deviceId?: number } = {}) { + super(); + this.sampleRate = sampleRate; + this.numChannels = numChannels; + this.framesPerBuffer = framesPerBuffer; + this.deviceId = deviceId; + } + + async onAttached(): Promise { + + if (!this.sourceSet) { + const stream = new ReadableStream({ + start: async (controller) => { + (async () => { + for await (const frame of this.queue) { + controller.enqueue(frame); + } + controller.close(); + })().catch((error) => { + this.logger.error({ error }, 'ConsoleAudioInput stream error'); + }); + }, + cancel: async () => { + // noop + }, + }); + this.deferredStream.setSource(stream); + this.sourceSet = true; + } + + if (this.started) return; + await this.startDevice(); + } + + onDetached(): void { + if (!this.started) return; + try { + this.stopDevice(); + } catch (error) { + this.logger.warn({ error }, 'ConsoleAudioInput stopDevice error'); + } + } + + private async startDevice() { + try { + // Try to use our native audio implementation + const { AudioIO, SampleFormat16Bit } = await import('./native_audio.js'); + + this.ai = new AudioIO({ + inOptions: { + channelCount: this.numChannels, + sampleFormat: SampleFormat16Bit, + sampleRate: this.sampleRate, + framesPerBuffer: this.framesPerBuffer, + }, + outOptions: undefined, // input only + }); + + this.ai.on('data', (buf: Buffer) => { + // Convert to AudioFrame + const int16 = new Int16Array(buf.buffer, buf.byteOffset, buf.byteLength / 2); + const frame = new AudioFrame(int16, this.sampleRate, this.numChannels, int16.length); + + // Calculate audio level + const maxInt16 = 32767; + let rms = 0; + for (let i = 0; i < int16.length; i++) { + const v = int16[i]! / maxInt16; + rms += v * v; + } + rms = Math.sqrt(rms / int16.length) * maxInt16; + const db = 20.0 * Math.log10(rms / maxInt16 + 1e-6); + this.microDb = db; + if (rms > MIN_RMS) { + this.receivedAudio = true; + } + + this.queue.put(frame); + }); + + this.ai.on('error', (err: Error) => { + this.logger.error({ error: err }, 'Audio input error'); + }); + + this.ai.start(); + this.started = true; + // Audio input started successfully + } catch (error) { + // Fallback to mock audio + this.logger.warn('Native audio failed, using mock audio input'); + + const frameSize = this.framesPerBuffer; + const intervalMs = (frameSize / this.sampleRate) * 1000; + + this.mockInterval = setInterval(() => { + const silentData = new Int16Array(frameSize * this.numChannels); + const frame = new AudioFrame(silentData, this.sampleRate, this.numChannels, frameSize); + + this.microDb = INPUT_DB_MIN + Math.random() * 10; + this.receivedAudio = true; + this.queue.put(frame); + }, intervalMs); + + this.started = true; + } + } + + private stopDevice() { + if (this.mockInterval) { + clearInterval(this.mockInterval); + this.mockInterval = null; + } + if (this.ai) { + try { + this.ai.quit?.(); + } catch { + try { + this.ai.stop?.(); + } catch {} + } + this.ai = null; + } + this.started = false; + } +} + +class StdoutTextOutput extends TextOutput { + private capturing = false; + private enabled = true; + + async captureText(text: string): Promise { + if (!this.enabled) return; + if (!this.capturing) { + this.capturing = true; + process.stdout.write('\r'); + process.stdout.write(esc(36)); + } + process.stdout.write(text); + } + + flush(): void { + if (this.capturing) { + process.stdout.write(esc(0)); + this.capturing = false; + } + } + + setEnabled(enabled: boolean): void { + this.enabled = enabled; + if (!enabled) this.capturing = false; + } +} + +class ConsoleAudioOutput extends AudioOutput { + private outputSampleRate: number; + private numChannels: number; + private ao: any | null = null; + private started = false; + private pushedDuration = 0.0; + private captureStart = 0; + private dispatchTimer: NodeJS.Timeout | null = null; + private _logger = log(); + + constructor({ sampleRate = 24000, numChannels = 1 }: { sampleRate?: number; numChannels?: number } = {}) { + super(sampleRate); + this.outputSampleRate = sampleRate; + this.numChannels = numChannels; + } + + async onAttached(): Promise { + if (this.started) return; + + try { + // Try to use our native audio implementation + const { AudioIO } = await import('./native_audio.js'); + + this.ao = new AudioIO({ + inOptions: undefined, // output only + outOptions: { + channelCount: this.numChannels, + sampleRate: this.outputSampleRate, + }, + }); + + this.ao.start(); + this.started = true; + this._logger.info('Using native audio output via command-line tools'); + } catch (error) { + // Fallback to mock audio output + this._logger.warn('Native audio failed, using mock audio output', error); + + this.ao = { + write: (data: Buffer) => { + const frameCount = data.length / (2 * this.numChannels); + const durationMs = (frameCount / this.outputSampleRate) * 1000; + + setTimeout(() => { + this.emit('playbackFinished'); + }, durationMs); + }, + end: () => {}, + }; + + this.started = true; + } + } + + onDetached(): void { + if (!this.started) return; + try { + this.ao?.end?.(); + } catch {} + this.ao = null; + this.started = false; + } + + async captureFrame(frame: AudioFrame): Promise { + await super.captureFrame(frame); + if (!this.captureStart) { + this.captureStart = Date.now(); + } + this.pushedDuration += frame.samplesPerChannel / frame.sampleRate; + if (this.ao) { + const view = new Int16Array(frame.data); + const buf = Buffer.from(view.buffer, view.byteOffset, view.byteLength); + this.ao.write(buf); + } + } + + flush(): void { + super.flush(); + if (this.pushedDuration > 0) { + const elapsed = (Date.now() - this.captureStart) / 1000; + const toWait = Math.max(0, this.pushedDuration - elapsed); + if (this.dispatchTimer) clearTimeout(this.dispatchTimer); + this.dispatchTimer = setTimeout(() => this.dispatchPlaybackFinished(), toWait * 1000); + } + } + + clearBuffer(): void { + if (this.dispatchTimer) { + clearTimeout(this.dispatchTimer); + this.dispatchTimer = null; + } + const played = Math.min((Date.now() - this.captureStart) / 1000, this.pushedDuration); + this.onPlaybackFinished({ playbackPosition: played, interrupted: played + 1.0 < this.pushedDuration }); + this.pushedDuration = 0; + this.captureStart = 0; + } + + private dispatchPlaybackFinished(): void { + const ev: PlaybackFinishedEvent = { playbackPosition: this.pushedDuration, interrupted: false }; + this.onPlaybackFinished(ev); + this.pushedDuration = 0; + this.captureStart = 0; + } +} + +export class ChatCLI extends EventEmitter { + private loop: NodeJS.Timeout | null = null; + private session: AgentSession; + private textSink: StdoutTextOutput; + private audioSink: ConsoleAudioOutput; + private transcriptSyncer: TranscriptionSynchronizer | null = null; + private inputAudio: ConsoleAudioInput; + private mode: 'text' | 'audio' = 'audio'; + private textBuf: string[] = []; + private micName: string = 'Microphone'; + private logger = log(); + private micCheckTimer: NodeJS.Timeout | null = null; + + constructor(agentSession: AgentSession, { syncTranscription = true }: { syncTranscription?: boolean } = {}) { + super(); + this.session = agentSession; + this.textSink = new StdoutTextOutput(); + this.audioSink = new ConsoleAudioOutput(); + this.inputAudio = new ConsoleAudioInput(); + + if (syncTranscription) { + this.transcriptSyncer = new TranscriptionSynchronizer(this.audioSink, this.textSink); + } + } + + async start(): Promise { + if (this.transcriptSyncer) { + this.updateTextOutput({ enable: true, stdoutEnable: false }); + } + + this.updateMicrophone(true); + this.updateSpeaker(true); + this.renderLoopStart(); + this.stdinStart(); + } + + stop(): void { + this.renderLoopStop(); + this.stdinStop(); + this.updateMicrophone(false); + this.updateSpeaker(false); + } + + private renderLoopStart() { + const interval = 1000 / FPS; + this.loop = setIntervalSafe(() => { + if (this.mode === 'audio') { + this.printAudioMode(); + } else if (this.mode === 'text' && !(this.textSink as any).capturing) { + this.printTextMode(); + } + }, interval); + } + + private renderLoopStop() { + if (this.loop) clearIntervalSafe(this.loop); + this.loop = null; + } + + private stdinStart() { + if (!process.stdin.isTTY) return; + process.stdin.setRawMode?.(true); + process.stdin.resume(); + process.stdin.setEncoding('utf8'); + process.stdin.on('data', this.onStdinData); + readline.emitKeypressEvents(process.stdin); + } + + private stdinStop() { + try { + process.stdin.off('data', this.onStdinData); + process.stdin.setRawMode?.(false); + } catch {} + } + + private onStdinData = (chunk: string) => { + for (const ch of chunk) { + if (ch === '\u0003') { + this.stop(); + process.exit(0); + return; + } + + if (ch === '\u0002') { + if (this.mode === 'audio') { + this.mode = 'text'; + this.updateTextOutput({ enable: true, stdoutEnable: true }); + this.updateMicrophone(false); + this.updateSpeaker(false); + process.stdout.write('\nSwitched to Text Input Mode.'); + } else { + this.mode = 'audio'; + this.updateTextOutput({ enable: true, stdoutEnable: false }); + this.updateMicrophone(true); + this.updateSpeaker(true); + this.textBuf = []; + process.stdout.write('\nSwitched to Audio Input Mode.'); + } + continue; + } + + if (this.mode === 'text') { + if (ch === '\r' || ch === '\n') { + const text = this.textBuf.join(''); + if (text) { + this.textBuf = []; + try { + this.session.interrupt(); + } catch {} + this.session.generateReply({ userInput: text }); + process.stdout.write('\n'); + } + } else if (ch === '\u007f') { + if (this.textBuf.length) { + this.textBuf.pop(); + process.stdout.write('\b \b'); + } + } else if (isPrintable(ch)) { + this.textBuf.push(ch); + process.stdout.write(ch); + } + } + } + }; + + private updateMicrophone(enable: boolean) { + if (enable) { + this.session.input.audio = this.inputAudio; + if (this.micCheckTimer) clearTimeout(this.micCheckTimer); + this.micCheckTimer = setTimeout(() => this.checkMicReceivedAudio(), 5000); + } else { + this.session.input.audio = null; + } + } + + private updateSpeaker(enable: boolean) { + if (enable) { + this.session.output.audio = this.transcriptSyncer ? this.transcriptSyncer.audioOutput : this.audioSink; + } else { + this.session.output.audio = null; + } + } + + private updateTextOutput({ enable, stdoutEnable }: { enable: boolean; stdoutEnable: boolean }) { + if (enable) { + this.session.output.transcription = this.transcriptSyncer ? this.transcriptSyncer.textOutput : this.textSink; + this.textSink.setEnabled(stdoutEnable); + } else { + this.session.output.transcription = null; + this.textBuf = []; + } + } + + private checkMicReceivedAudio() { + if (!this.inputAudio.receivedAudio) { + this.logger.error('No audio input detected. Check microphone permissions.'); + } + } + + private printAudioMode() { + const amplitude = clampNormalizeDb(this.inputAudio.microDb, INPUT_DB_MIN, INPUT_DB_MAX); + const nbBar = Math.round(amplitude * MAX_AUDIO_BAR); + const colorCode = amplitude > 0.75 ? 31 : amplitude > 0.5 ? 33 : 32; + const bar = '#'.repeat(nbBar) + '-'.repeat(MAX_AUDIO_BAR - nbBar); + // Clear the line and write the audio status + process.stdout.write(`\r\x1b[K[Audio] ${this.micName.slice(-20)} [${this.inputAudio.microDb.toFixed(2)} dBFS] ${esc(colorCode)}[${bar}]${esc(0)}`); + } + + private printTextMode() { + process.stdout.write('\r'); + const prompt = 'Enter your message: '; + process.stdout.write(`[Text ${prompt}${this.textBuf.join('')}`); + } +} + +function isPrintable(ch: string) { + if (ch.length !== 1) return false; + const code = ch.charCodeAt(0); + return code >= 32 && code !== 127; +} diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index d21044c43..11c5b1c73 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -200,8 +200,18 @@ export class AgentInput { } set audio(stream: AudioInput | null) { + // Detach old stream + if (this._audioStream && this._audioEnabled) { + this._audioStream.onDetached(); + } + this._audioStream = stream; this.audioChanged(); + + // Attach new stream if enabled + if (this._audioStream && this._audioEnabled) { + this._audioStream.onAttached(); + } } } diff --git a/agents/src/voice/native_audio.ts b/agents/src/voice/native_audio.ts new file mode 100644 index 000000000..7504c37bd --- /dev/null +++ b/agents/src/voice/native_audio.ts @@ -0,0 +1,619 @@ +import { Readable, Writable, Duplex, Transform } from 'stream'; +import { spawn, ChildProcess } from 'child_process'; +import { EventEmitter } from 'events'; +import * as os from 'os'; + +export const SampleFormat8Bit = 8; +export const SampleFormat16Bit = 16; +export const SampleFormat24Bit = 24; +export const SampleFormat32Bit = 32; +export const SampleFormatFloat32 = 1; + +interface AudioOptions { + sampleRate?: number; + channelCount?: number; + sampleFormat?: number; + deviceId?: number; + framesPerBuffer?: number; + closeOnError?: boolean; + highwaterMark?: number; +} + +interface AudioIOOptions { + inOptions?: AudioOptions; + outOptions?: AudioOptions; +} + +function getSampleFormatArgs(sampleFormat: number): { bitDepth: string; encoding: string } { + switch (sampleFormat) { + case SampleFormat8Bit: + return { bitDepth: '8', encoding: 'unsigned-integer' }; + case SampleFormat16Bit: + return { bitDepth: '16', encoding: 'signed-integer' }; + case SampleFormat24Bit: + return { bitDepth: '24', encoding: 'signed-integer' }; + case SampleFormat32Bit: + return { bitDepth: '32', encoding: 'signed-integer' }; + case SampleFormatFloat32: + return { bitDepth: '32', encoding: 'floating-point' }; + default: + return { bitDepth: '16', encoding: 'signed-integer' }; + } +} + +class AudioInputStream extends Readable { + private process: ChildProcess | null = null; + private options: AudioOptions; + private isStarted = false; + private buffer: Buffer[] = []; + private totalBytesRead = 0; + private startTime: number = 0; + + constructor(options: AudioOptions) { + super({ + highWaterMark: options.highwaterMark || 16384, + objectMode: false + }); + this.options = { + sampleRate: 44100, + channelCount: 2, + sampleFormat: SampleFormat16Bit, + deviceId: -1, + closeOnError: true, + ...options + }; + } + + start() { + if (this.isStarted) return; + this.isStarted = true; + this.startTime = Date.now(); + this.startRecording(); + } + + private startRecording() { + const { sampleRate, channelCount, sampleFormat } = this.options; + const { bitDepth, encoding } = getSampleFormatArgs(sampleFormat!); + const platform = os.platform(); + + try { + if (platform === 'darwin') { + this.process = spawn('sox', [ + '-d', + '-r', String(sampleRate), + '-c', String(channelCount), + '-b', bitDepth, + '-e', encoding, + '-t', 'raw', + '-' + ], { + stdio: ['ignore', 'pipe', 'ignore'] + }); + } else if (platform === 'linux') { + const format = sampleFormat === SampleFormat16Bit ? 'S16_LE' : + sampleFormat === SampleFormat32Bit ? 'S32_LE' : 'S16_LE'; + + this.process = spawn('arecord', [ + '-f', format, + '-r', String(sampleRate), + '-c', String(channelCount), + '-t', 'raw', + '-q', + '-' + ], { + stdio: ['ignore', 'pipe', 'ignore'] + }); + } else if (platform === 'win32') { + const format = sampleFormat === SampleFormat16Bit ? 's16le' : + sampleFormat === SampleFormat32Bit ? 's32le' : + sampleFormat === SampleFormatFloat32 ? 'f32le' : 's16le'; + + this.process = spawn('ffmpeg', [ + '-f', 'dshow', + '-i', 'audio="Microphone (Realtek Audio)"', + '-ar', String(sampleRate), + '-ac', String(channelCount), + '-f', format, + '-' + ], { + stdio: ['ignore', 'pipe', 'ignore'] + }); + } + + if (this.process && this.process.stdout) { + this.process.stdout.on('data', (chunk: Buffer) => { + const timestamp = (Date.now() - this.startTime) / 1000; + (chunk as any).timestamp = timestamp; + this.totalBytesRead += chunk.length; + + + + if (!this.push(chunk)) { + this.process?.stdout?.pause(); + } + }); + + this.process.stderr?.on('data', (data) => { + // Ignore stderr output + }); + + this.process.on('error', (err) => { + if (this.options.closeOnError) { + this.destroy(err); + } else { + this.emit('error', err); + } + }); + + this.process.on('exit', (code, signal) => { + if (code !== 0 && code !== null) { + const err = new Error(`Audio input process exited with code ${code}`); + if (this.options.closeOnError) { + this.destroy(err); + } else { + this.emit('error', err); + } + } + this.push(null); + }); + } + } catch (err) { + if (this.options.closeOnError) { + this.destroy(err as Error); + } else { + this.emit('error', err); + } + } + } + + _read() { + if (this.process?.stdout) { + this.process.stdout.resume(); + } + if (!this.isStarted) { + this.start(); + } + } + + _destroy(err: Error | null, callback: (err: Error | null) => void) { + if (this.process) { + this.process.kill('SIGTERM'); + this.process = null; + } + callback(err); + } + + quit(callback?: () => void) { + this.destroy(); + if (callback) callback(); + } + + abort(callback?: () => void) { + this.destroy(); + if (callback) callback(); + } +} + +class AudioOutputStream extends Writable { + private process: ChildProcess | null = null; + private options: AudioOptions; + private isStarted = false; + private totalBytesWritten = 0; + + constructor(options: AudioOptions) { + super({ + highWaterMark: options.highwaterMark || 16384, + objectMode: false, + decodeStrings: false + }); + this.options = { + sampleRate: 44100, + channelCount: 2, + sampleFormat: SampleFormat16Bit, + deviceId: -1, + closeOnError: true, + ...options + }; + } + + start() { + if (this.isStarted) return; + this.isStarted = true; + this.startPlayback(); + } + + private startPlayback() { + const { sampleRate, channelCount, sampleFormat } = this.options; + const { bitDepth, encoding } = getSampleFormatArgs(sampleFormat!); + const platform = os.platform(); + + try { + if (platform === 'darwin') { + this.process = spawn('sox', [ + '-r', String(sampleRate), + '-c', String(channelCount), + '-b', bitDepth, + '-e', encoding, + '-t', 'raw', + '-', + '-d' + ], { + stdio: ['pipe', 'ignore', 'ignore'] + }); + } else if (platform === 'linux') { + const format = sampleFormat === SampleFormat16Bit ? 'S16_LE' : + sampleFormat === SampleFormat32Bit ? 'S32_LE' : 'S16_LE'; + + this.process = spawn('aplay', [ + '-f', format, + '-r', String(sampleRate), + '-c', String(channelCount), + '-t', 'raw', + '-q' + ], { + stdio: ['pipe', 'ignore', 'ignore'] + }); + } else if (platform === 'win32') { + const format = sampleFormat === SampleFormat16Bit ? 's16le' : + sampleFormat === SampleFormat32Bit ? 's32le' : + sampleFormat === SampleFormatFloat32 ? 'f32le' : 's16le'; + + this.process = spawn('ffmpeg', [ + '-f', format, + '-ar', String(sampleRate), + '-ac', String(channelCount), + '-i', '-', + '-f', 'dsound', + 'default' + ], { + stdio: ['pipe', 'ignore', 'ignore'] + }); + } + + if (this.process) { + this.process.on('error', (err) => { + if (this.options.closeOnError) { + this.destroy(err); + } else { + this.emit('error', err); + } + }); + + this.process.on('exit', (code) => { + if (code !== 0 && code !== null) { + const err = new Error(`Audio output process exited with code ${code}`); + if (this.options.closeOnError) { + this.destroy(err); + } else { + this.emit('error', err); + } + } + }); + } + } catch (err) { + if (this.options.closeOnError) { + this.destroy(err as Error); + } else { + this.emit('error', err); + } + } + } + + _write(chunk: any, encoding: string, callback: (error?: Error | null) => void) { + if (!this.isStarted) { + this.start(); + } + + if (this.process && this.process.stdin) { + this.totalBytesWritten += chunk.length; + this.process.stdin.write(chunk, callback); + } else { + callback(new Error('Audio output process not initialized')); + } + } + + _destroy(err: Error | null, callback: (err: Error | null) => void) { + if (this.process) { + if (this.process.stdin) { + this.process.stdin.end(); + } + this.process.kill('SIGTERM'); + this.process = null; + } + callback(err); + } + + _final(callback: (error?: Error | null) => void) { + if (this.process && this.process.stdin) { + this.process.stdin.end(); + } + callback(); + } + + quit(callback?: () => void) { + this.end(); + if (callback) callback(); + } + + abort(callback?: () => void) { + this.destroy(); + if (callback) callback(); + } +} + +class AudioDuplexStream extends Duplex { + private inputStream: AudioInputStream; + private outputStream: AudioOutputStream; + + constructor(options: AudioIOOptions) { + const inOpts = options.inOptions || {}; + const outOpts = options.outOptions || {}; + + super({ + allowHalfOpen: false, + readableHighWaterMark: inOpts.highwaterMark || 16384, + writableHighWaterMark: outOpts.highwaterMark || 16384, + objectMode: false, + decodeStrings: false + }); + + this.inputStream = new AudioInputStream(inOpts); + this.outputStream = new AudioOutputStream(outOpts); + + this.inputStream.on('data', (chunk) => { + if (!this.push(chunk)) { + this.inputStream.pause(); + } + }); + + this.inputStream.on('end', () => { + this.push(null); + }); + + this.inputStream.on('error', (err) => { + this.destroy(err); + }); + + this.outputStream.on('error', (err) => { + this.destroy(err); + }); + } + + start() { + this.inputStream.start(); + this.outputStream.start(); + } + + _read() { + this.inputStream.resume(); + } + + _write(chunk: any, encoding: string, callback: (error?: Error | null) => void) { + this.outputStream.write(chunk, encoding as BufferEncoding, callback); + } + + _destroy(err: Error | null, callback: (err: Error | null) => void) { + this.inputStream.destroy(); + this.outputStream.destroy(); + callback(err); + } + + quit(callback?: () => void) { + this.inputStream.quit(); + this.outputStream.quit(); + if (callback) callback(); + } + + abort(callback?: () => void) { + this.inputStream.abort(); + this.outputStream.abort(); + if (callback) callback(); + } +} + +export class AudioIO extends EventEmitter { + private stream: Readable | Writable | Duplex; + private options: AudioIOOptions; + + constructor(options: AudioIOOptions) { + super(); + this.options = options; + + if (options.inOptions && options.outOptions) { + this.stream = new AudioDuplexStream(options); + } else if (options.inOptions) { + this.stream = new AudioInputStream(options.inOptions); + } else if (options.outOptions) { + this.stream = new AudioOutputStream(options.outOptions); + } else { + throw new Error('AudioIO requires either inOptions or outOptions'); + } + + this.stream.on('error', (err) => { + this.emit('error', err); + }); + + this.stream.on('close', () => { + this.emit('close'); + this.emit('closed'); + }); + + this.stream.on('finish', () => { + this.quit(); + this.emit('finish'); + this.emit('finished'); + }); + } + + start() { + if ('start' in this.stream) { + (this.stream as any).start(); + } + return this.stream; + } + + quit(callback?: () => void) { + if ('quit' in this.stream) { + (this.stream as any).quit(callback); + } else { + this.stream.destroy(); + if (callback) callback(); + } + } + + abort(callback?: () => void) { + if ('abort' in this.stream) { + (this.stream as any).abort(callback); + } else { + this.stream.destroy(); + if (callback) callback(); + } + } + + pipe(destination: T, options?: { end?: boolean }): T { + return this.stream.pipe(destination, options); + } + + unpipe(destination?: NodeJS.WritableStream): this { + (this.stream as Readable).unpipe(destination); + return this; + } + + write(chunk: any, encoding?: BufferEncoding | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean { + if (this.stream instanceof Writable || this.stream instanceof Duplex) { + if (typeof encoding === 'function') { + return this.stream.write(chunk, encoding); + } else if (encoding) { + return this.stream.write(chunk, encoding, callback); + } else { + return this.stream.write(chunk, callback); + } + } + return false; + } + + end(chunk?: any, encoding?: BufferEncoding | (() => void), callback?: () => void): void { + if (this.stream instanceof Writable || this.stream instanceof Duplex) { + if (typeof encoding === 'function') { + this.stream.end(chunk, encoding); + } else if (encoding) { + this.stream.end(chunk, encoding, callback); + } else if (chunk) { + this.stream.end(chunk, callback); + } else { + this.stream.end(); + } + } + } + + on(event: string | symbol, listener: (...args: any[]) => void): this { + if (event === 'data' && (this.stream instanceof Readable || this.stream instanceof Duplex)) { + this.stream.on('data', listener); + } else { + super.on(event, listener); + } + return this; + } + + once(event: string | symbol, listener: (...args: any[]) => void): this { + if (event === 'data' && (this.stream instanceof Readable || this.stream instanceof Duplex)) { + this.stream.once('data', listener); + } else { + super.once(event, listener); + } + return this; + } +} + +export function getDevices(): Array { + const platform = os.platform(); + const devices = []; + + if (platform === 'darwin') { + try { + const result = spawn('system_profiler', ['SPAudioDataType']); + devices.push({ + id: 0, + name: 'Built-in Microphone', + maxInputChannels: 2, + maxOutputChannels: 0, + defaultSampleRate: 44100, + defaultLowInputLatency: 0.002, + defaultLowOutputLatency: 0.01, + defaultHighInputLatency: 0.012, + defaultHighOutputLatency: 0.1, + hostAPIName: 'Core Audio' + }); + devices.push({ + id: 1, + name: 'Built-in Output', + maxInputChannels: 0, + maxOutputChannels: 2, + defaultSampleRate: 44100, + defaultLowInputLatency: 0.01, + defaultLowOutputLatency: 0.002, + defaultHighInputLatency: 0.1, + defaultHighOutputLatency: 0.012, + hostAPIName: 'Core Audio' + }); + } catch (e) { + // Fall through to defaults + } + } + + if (devices.length === 0) { + devices.push({ + id: -1, + name: 'Default Input Device', + maxInputChannels: 2, + maxOutputChannels: 0, + defaultSampleRate: 44100, + defaultLowInputLatency: 0.01, + defaultLowOutputLatency: 0.01, + defaultHighInputLatency: 0.1, + defaultHighOutputLatency: 0.1, + hostAPIName: 'Default' + }); + devices.push({ + id: -1, + name: 'Default Output Device', + maxInputChannels: 0, + maxOutputChannels: 2, + defaultSampleRate: 44100, + defaultLowInputLatency: 0.01, + defaultLowOutputLatency: 0.01, + defaultHighInputLatency: 0.1, + defaultHighOutputLatency: 0.1, + hostAPIName: 'Default' + }); + } + + return devices; +} + +export function getHostAPIs(): any { + const platform = os.platform(); + let hostAPIName = 'Default'; + + if (platform === 'darwin') { + hostAPIName = 'Core Audio'; + } else if (platform === 'win32') { + hostAPIName = 'MME'; + } else if (platform === 'linux') { + hostAPIName = 'ALSA'; + } + + return { + defaultHostAPI: 0, + HostAPIs: [ + { + id: 0, + name: hostAPIName, + type: hostAPIName, + deviceCount: 2, + defaultInput: 0, + defaultOutput: 1 + } + ] + }; +} \ No newline at end of file diff --git a/examples/src/basic_agent.ts b/examples/src/basic_agent.ts index d981973f8..90fdca29f 100644 --- a/examples/src/basic_agent.ts +++ b/examples/src/basic_agent.ts @@ -30,6 +30,9 @@ export default defineAgent({ const vad = ctx.proc.userData.vad! as silero.VAD; + // Check if we're in console mode (no room provided) + const isConsoleMode = !ctx.room; + const session = new voice.AgentSession({ vad, stt: new deepgram.STT(), @@ -37,7 +40,8 @@ export default defineAgent({ llm: new openai.LLM(), // to use realtime model, replace the stt, llm, tts and vad with the following // llm: new openai.realtime.RealtimeModel(), - turnDetection: new livekit.turnDetector.MultilingualModel(), + // Only use turn detection in room mode, not in console mode + ...(isConsoleMode ? {} : { turnDetection: new livekit.turnDetector.MultilingualModel() }), }); const usageCollector = new metrics.UsageCollector(); From 69f568ba7cba758b68adb520a34112c2a507f983 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 14:32:59 -0700 Subject: [PATCH 2/8] better logging --- agents/src/voice/chat_cli.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/agents/src/voice/chat_cli.ts b/agents/src/voice/chat_cli.ts index 076d964b1..fb922c564 100644 --- a/agents/src/voice/chat_cli.ts +++ b/agents/src/voice/chat_cli.ts @@ -187,6 +187,7 @@ class StdoutTextOutput extends TextOutput { flush(): void { if (this.capturing) { process.stdout.write(esc(0)); + process.stdout.write('\n'); this.capturing = false; } } @@ -195,6 +196,10 @@ class StdoutTextOutput extends TextOutput { this.enabled = enabled; if (!enabled) this.capturing = false; } + + get isCapturing(): boolean { + return this.capturing; + } } class ConsoleAudioOutput extends AudioOutput { @@ -314,6 +319,9 @@ export class ChatCLI extends EventEmitter { private micName: string = 'Microphone'; private logger = log(); private micCheckTimer: NodeJS.Timeout | null = null; + private currentAudioLine: string = ''; + private isLogging: boolean = false; + constructor(agentSession: AgentSession, { syncTranscription = true }: { syncTranscription?: boolean } = {}) { super(); @@ -325,6 +333,9 @@ export class ChatCLI extends EventEmitter { if (syncTranscription) { this.transcriptSyncer = new TranscriptionSynchronizer(this.audioSink, this.textSink); } + + // Set logger to only show warnings and errors in console mode + this.logger.level = 'warn'; } async start(): Promise { @@ -348,9 +359,9 @@ export class ChatCLI extends EventEmitter { private renderLoopStart() { const interval = 1000 / FPS; this.loop = setIntervalSafe(() => { - if (this.mode === 'audio') { + if (this.mode === 'audio' && !this.textSink.isCapturing) { this.printAudioMode(); - } else if (this.mode === 'text' && !(this.textSink as any).capturing) { + } else if (this.mode === 'text' && !this.textSink.isCapturing) { this.printTextMode(); } }, interval); @@ -466,8 +477,8 @@ export class ChatCLI extends EventEmitter { const nbBar = Math.round(amplitude * MAX_AUDIO_BAR); const colorCode = amplitude > 0.75 ? 31 : amplitude > 0.5 ? 33 : 32; const bar = '#'.repeat(nbBar) + '-'.repeat(MAX_AUDIO_BAR - nbBar); - // Clear the line and write the audio status - process.stdout.write(`\r\x1b[K[Audio] ${this.micName.slice(-20)} [${this.inputAudio.microDb.toFixed(2)} dBFS] ${esc(colorCode)}[${bar}]${esc(0)}`); + this.currentAudioLine = `[Audio] ${this.micName.slice(-20)} [${this.inputAudio.microDb.toFixed(2)} dBFS] ${esc(colorCode)}[${bar}]${esc(0)}`; + process.stdout.write(`\r${this.currentAudioLine}`); } private printTextMode() { From 9ea279711a2edbc1f52b9de9e40f7639776a9e06 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 19:31:19 -0700 Subject: [PATCH 3/8] simplify console mode --- agents/src/cli.ts | 40 +++++++++++----------------------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/agents/src/cli.ts b/agents/src/cli.ts index 47317d5a4..c43bacd01 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -225,47 +225,29 @@ export const runApp = (opts: WorkerOptions) => { .description('Start a new conversation inside the console') .action(async () => { const options = program.optsWithGlobals(); - - opts.wsURL = opts.wsURL || 'ws://localhost:7881/fake_console_url'; - opts.apiKey = opts.apiKey || 'fake_console_key'; - opts.apiSecret = opts.apiSecret || 'fake_console_secret'; - initializeLogger({ pretty: true, level: options.logLevel }); const logger = log(); try { - const mod = await import(new URL(opts.agent, import.meta.url).href).catch(async () => { - const { pathToFileURL } = await import('node:url'); - const url = pathToFileURL(opts.agent); - return import(url.href); - }); - - const agentDef = (mod && mod.default) as { - entry: (ctx: unknown) => Promise; - prewarm?: (proc: unknown) => Promise | void; - }; - if (!agentDef || typeof agentDef.entry !== 'function') { - logger.fatal('default export is not an agent with an entry() function'); + const mod = await import(new URL(opts.agent, import.meta.url).href); + const agentDef = mod?.default; + + if (!agentDef?.entry) { + logger.fatal('default export must have an entry() function'); process.exit(1); } - process.once('SIGINT', () => process.exit(130)); - process.once('SIGTERM', () => process.exit(143)); - const mockCtx = { room: undefined, - connect: async () => {}, - proc: { - userData: {}, - inferenceExecutor: null, // Add missing inference executor - }, + proc: { userData: {} }, + connect: async () => {} }; - + if (agentDef.prewarm) { - await agentDef.prewarm(mockCtx.proc as any); + await agentDef.prewarm(mockCtx.proc); } - - await agentDef.entry(mockCtx as any); + + await agentDef.entry(mockCtx); } catch (error) { logger.fatal(error); process.exit(1); From f70e8065be9d53be443aec243c083900c61c2d48 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 22:12:37 -0700 Subject: [PATCH 4/8] fix agent session --- agents/src/voice/agent_session.ts | 36 ++++--------------------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index f87d891d4..3f20c5599 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -182,40 +182,12 @@ export class AgentSession< // Check if this is console mode (no room provided) if (!room) { - // Console mode - check if IO is already configured - if ( - this.input.audio !== null || - this.output.audio !== null || - this.output.transcription !== null - ) { - this.logger.warn( - 'Console mode detected but input.audio or output.audio or output.transcription is already set' - ); - } else { - // Auto-create ChatCLI for console mode - const { ChatCLI } = await import('./chat_cli.js'); - const chatCli = new ChatCLI(this); - await chatCli.start(); - } + // Auto-create ChatCLI for console mode + const { ChatCLI } = await import('./chat_cli.js'); + const chatCli = new ChatCLI(this); + await chatCli.start(); } else { // Room mode - // Check for existing input/output configuration and warn if needed - if (this.input.audio && inputOptions?.audioEnabled !== false) { - this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..'); - } - - if (this.output.audio && outputOptions?.audioEnabled !== false) { - this.logger.warn( - 'RoomIO audio output is enabled but output.audio is already set, ignoring..', - ); - } - - if (this.output.transcription && outputOptions?.transcriptionEnabled !== false) { - this.logger.warn( - 'RoomIO transcription output is enabled but output.transcription is already set, ignoring..', - ); - } - this.roomIO = new RoomIO({ agentSession: this, room, From c75828230ad0a8ee6f9b50a2b5160ccc7a3bb24d Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 22:27:37 -0700 Subject: [PATCH 5/8] another agent session fix --- agents/src/voice/agent_session.ts | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 3f20c5599..e4d51eec7 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -180,14 +180,28 @@ export class AgentSession< this.agent = agent; this._updateAgentState('initializing'); - // Check if this is console mode (no room provided) if (!room) { - // Auto-create ChatCLI for console mode - const { ChatCLI } = await import('./chat_cli.js'); - const chatCli = new ChatCLI(this); - await chatCli.start(); + const { ChatCLI } = await import('./chat_cli.js'); + const chatCli = new ChatCLI(this); + await chatCli.start(); } else { // Room mode + if (this.input.audio && inputOptions?.audioEnabled !== false) { + this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..'); + } + + if (this.output.audio && outputOptions?.audioEnabled !== false) { + this.logger.warn( + 'RoomIO audio output is enabled but output.audio is already set, ignoring..', + ); + } + + if (this.output.transcription && outputOptions?.transcriptionEnabled !== false) { + this.logger.warn( + 'RoomIO transcription output is enabled but output.transcription is already set, ignoring..', + ); + } + this.roomIO = new RoomIO({ agentSession: this, room, From a1b4cbbe9d4b0918df3f4143115c8edddb5b25a8 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 23:16:28 -0700 Subject: [PATCH 6/8] cleanup and inference executor --- agents/src/cli.ts | 39 ++++++++++++++++++++++++++++++++++--- examples/src/basic_agent.ts | 6 +----- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/agents/src/cli.ts b/agents/src/cli.ts index c43bacd01..fef574209 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -227,6 +227,11 @@ export const runApp = (opts: WorkerOptions) => { const options = program.optsWithGlobals(); initializeLogger({ pretty: true, level: options.logLevel }); const logger = log(); + + if (!process.env.LIVEKIT_ENABLE_CONSOLE_MODE) { + logger.fatal('Console mode is disabled. Set LIVEKIT_ENABLE_CONSOLE_MODE=true to enable.'); + process.exit(1); + } try { const mod = await import(new URL(opts.agent, import.meta.url).href); @@ -237,17 +242,45 @@ export const runApp = (opts: WorkerOptions) => { process.exit(1); } + const { CurrentJobContext } = await import('./job.js'); + const { InferenceRunner } = await import('./inference_runner.js'); + + let inferenceExecutor: any = undefined; + if (Object.entries(InferenceRunner.registeredRunners).length) { + const runners: { [id: string]: any } = {}; + for (const [method, importPath] of Object.entries(InferenceRunner.registeredRunners)) { + logger.debug(`Loading inference runner for ${method}`); + const mod = await import(importPath); + runners[method] = new mod.default(); + await runners[method].initialize(); + } + + inferenceExecutor = { + doInference: async (method: string, data: unknown) => { + const runner = runners[method]; + if (!runner) { + logger.warn(`No runner found for method ${method}`); + return {}; + } + return runner.run(data); + } + }; + } + const mockCtx = { room: undefined, proc: { userData: {} }, - connect: async () => {} + connect: async () => {}, + inferenceExecutor: inferenceExecutor || { doInference: async () => ({}) } }; + new CurrentJobContext(mockCtx as any); + if (agentDef.prewarm) { - await agentDef.prewarm(mockCtx.proc); + await agentDef.prewarm(mockCtx.proc as any); } - await agentDef.entry(mockCtx); + await agentDef.entry(mockCtx as any); } catch (error) { logger.fatal(error); process.exit(1); diff --git a/examples/src/basic_agent.ts b/examples/src/basic_agent.ts index 90fdca29f..cac80738a 100644 --- a/examples/src/basic_agent.ts +++ b/examples/src/basic_agent.ts @@ -29,9 +29,6 @@ export default defineAgent({ }); const vad = ctx.proc.userData.vad! as silero.VAD; - - // Check if we're in console mode (no room provided) - const isConsoleMode = !ctx.room; const session = new voice.AgentSession({ vad, @@ -40,8 +37,7 @@ export default defineAgent({ llm: new openai.LLM(), // to use realtime model, replace the stt, llm, tts and vad with the following // llm: new openai.realtime.RealtimeModel(), - // Only use turn detection in room mode, not in console mode - ...(isConsoleMode ? {} : { turnDetection: new livekit.turnDetector.MultilingualModel() }), + turnDetection: new livekit.turnDetector.MultilingualModel(), }); const usageCollector = new metrics.UsageCollector(); From ed98fc4c9e73f47e1c89c37390f5755da5761c87 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 23:26:38 -0700 Subject: [PATCH 7/8] mock inference executor --- agents/src/cli.ts | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/agents/src/cli.ts b/agents/src/cli.ts index fef574209..f3d65f32a 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -243,35 +243,14 @@ export const runApp = (opts: WorkerOptions) => { } const { CurrentJobContext } = await import('./job.js'); - const { InferenceRunner } = await import('./inference_runner.js'); - - let inferenceExecutor: any = undefined; - if (Object.entries(InferenceRunner.registeredRunners).length) { - const runners: { [id: string]: any } = {}; - for (const [method, importPath] of Object.entries(InferenceRunner.registeredRunners)) { - logger.debug(`Loading inference runner for ${method}`); - const mod = await import(importPath); - runners[method] = new mod.default(); - await runners[method].initialize(); - } - - inferenceExecutor = { - doInference: async (method: string, data: unknown) => { - const runner = runners[method]; - if (!runner) { - logger.warn(`No runner found for method ${method}`); - return {}; - } - return runner.run(data); - } - }; - } const mockCtx = { room: undefined, proc: { userData: {} }, connect: async () => {}, - inferenceExecutor: inferenceExecutor || { doInference: async () => ({}) } + inferenceExecutor: { + doInference: async () => ({}) // Mock inference executor for now to keep things simple + } }; new CurrentJobContext(mockCtx as any); From b634e9b34ef65f594bbcbebd1d48d935ca9498c6 Mon Sep 17 00:00:00 2001 From: Kushagro Bhattacharjee Date: Fri, 22 Aug 2025 23:31:31 -0700 Subject: [PATCH 8/8] lint --- agents/src/cli.ts | 16 +- agents/src/voice/agent_session.ts | 12 +- agents/src/voice/chat_cli.ts | 73 ++++++--- agents/src/voice/io.ts | 4 +- agents/src/voice/native_audio.ts | 263 +++++++++++++++++++----------- examples/src/basic_agent.ts | 1 - turbo.json | 3 +- 7 files changed, 230 insertions(+), 142 deletions(-) diff --git a/agents/src/cli.ts b/agents/src/cli.ts index f3d65f32a..97f512d37 100644 --- a/agents/src/cli.ts +++ b/agents/src/cli.ts @@ -227,7 +227,7 @@ export const runApp = (opts: WorkerOptions) => { const options = program.optsWithGlobals(); initializeLogger({ pretty: true, level: options.logLevel }); const logger = log(); - + if (!process.env.LIVEKIT_ENABLE_CONSOLE_MODE) { logger.fatal('Console mode is disabled. Set LIVEKIT_ENABLE_CONSOLE_MODE=true to enable.'); process.exit(1); @@ -236,29 +236,29 @@ export const runApp = (opts: WorkerOptions) => { try { const mod = await import(new URL(opts.agent, import.meta.url).href); const agentDef = mod?.default; - + if (!agentDef?.entry) { logger.fatal('default export must have an entry() function'); process.exit(1); } const { CurrentJobContext } = await import('./job.js'); - + const mockCtx = { room: undefined, proc: { userData: {} }, connect: async () => {}, inferenceExecutor: { - doInference: async () => ({}) // Mock inference executor for now to keep things simple - } + doInference: async () => ({}), // Mock inference executor for now to keep things simple + }, }; - + new CurrentJobContext(mockCtx as any); - + if (agentDef.prewarm) { await agentDef.prewarm(mockCtx.proc as any); } - + await agentDef.entry(mockCtx as any); } catch (error) { logger.fatal(error); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index e4d51eec7..fe2d06257 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -181,13 +181,15 @@ export class AgentSession< this._updateAgentState('initializing'); if (!room) { - const { ChatCLI } = await import('./chat_cli.js'); - const chatCli = new ChatCLI(this); - await chatCli.start(); + const { ChatCLI } = await import('./chat_cli.js'); + const chatCli = new ChatCLI(this); + await chatCli.start(); } else { // Room mode if (this.input.audio && inputOptions?.audioEnabled !== false) { - this.logger.warn('RoomIO audio input is enabled but input.audio is already set, ignoring..'); + this.logger.warn( + 'RoomIO audio input is enabled but input.audio is already set, ignoring..', + ); } if (this.output.audio && outputOptions?.audioEnabled !== false) { @@ -227,8 +229,6 @@ export class AgentSession< this._updateAgentState('listening'); } - - updateAgent(agent: Agent): void { this.agent = agent; diff --git a/agents/src/voice/chat_cli.ts b/agents/src/voice/chat_cli.ts index fb922c564..3d0dd654d 100644 --- a/agents/src/voice/chat_cli.ts +++ b/agents/src/voice/chat_cli.ts @@ -1,18 +1,20 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 import { AudioFrame } from '@livekit/rtc-node'; import { EventEmitter } from 'node:events'; import { createRequire } from 'node:module'; import process from 'node:process'; import readline from 'node:readline'; -import { setInterval as setIntervalSafe, clearInterval as clearIntervalSafe } from 'node:timers'; +import { ReadableStream } from 'node:stream/web'; +import { clearInterval as clearIntervalSafe, setInterval as setIntervalSafe } from 'node:timers'; import { log } from '../log.js'; import { AsyncIterableQueue } from '../utils.js'; -import type { Agent } from './agent.js'; import type { AgentSession } from './agent_session.js'; -import { AudioInput, AudioOutput, TextOutput, type PlaybackFinishedEvent } from './io.js'; +import { AudioInput, AudioOutput, type PlaybackFinishedEvent, TextOutput } from './io.js'; import { TranscriptionSynchronizer } from './transcription/synchronizer.js'; -import { ReadableStream } from 'node:stream/web'; -const require = createRequire(import.meta.url); +const _require = createRequire(import.meta.url); const MAX_AUDIO_BAR = 30; const INPUT_DB_MIN = -70.0; @@ -44,7 +46,17 @@ class ConsoleAudioInput extends AudioInput { microDb: number = INPUT_DB_MIN; receivedAudio: boolean = false; - constructor({ sampleRate = 24000, numChannels = 1, framesPerBuffer = 240, deviceId }: { sampleRate?: number; numChannels?: number; framesPerBuffer?: number; deviceId?: number } = {}) { + constructor({ + sampleRate = 24000, + numChannels = 1, + framesPerBuffer = 240, + deviceId, + }: { + sampleRate?: number; + numChannels?: number; + framesPerBuffer?: number; + deviceId?: number; + } = {}) { super(); this.sampleRate = sampleRate; this.numChannels = numChannels; @@ -53,7 +65,6 @@ class ConsoleAudioInput extends AudioInput { } async onAttached(): Promise { - if (!this.sourceSet) { const stream = new ReadableStream({ start: async (controller) => { @@ -91,7 +102,7 @@ class ConsoleAudioInput extends AudioInput { try { // Try to use our native audio implementation const { AudioIO, SampleFormat16Bit } = await import('./native_audio.js'); - + this.ai = new AudioIO({ inOptions: { channelCount: this.numChannels, @@ -134,19 +145,19 @@ class ConsoleAudioInput extends AudioInput { } catch (error) { // Fallback to mock audio this.logger.warn('Native audio failed, using mock audio input'); - + const frameSize = this.framesPerBuffer; const intervalMs = (frameSize / this.sampleRate) * 1000; - + this.mockInterval = setInterval(() => { const silentData = new Int16Array(frameSize * this.numChannels); const frame = new AudioFrame(silentData, this.sampleRate, this.numChannels, frameSize); - + this.microDb = INPUT_DB_MIN + Math.random() * 10; this.receivedAudio = true; this.queue.put(frame); }, intervalMs); - + this.started = true; } } @@ -212,7 +223,10 @@ class ConsoleAudioOutput extends AudioOutput { private dispatchTimer: NodeJS.Timeout | null = null; private _logger = log(); - constructor({ sampleRate = 24000, numChannels = 1 }: { sampleRate?: number; numChannels?: number } = {}) { + constructor({ + sampleRate = 24000, + numChannels = 1, + }: { sampleRate?: number; numChannels?: number } = {}) { super(sampleRate); this.outputSampleRate = sampleRate; this.numChannels = numChannels; @@ -220,11 +234,11 @@ class ConsoleAudioOutput extends AudioOutput { async onAttached(): Promise { if (this.started) return; - + try { // Try to use our native audio implementation const { AudioIO } = await import('./native_audio.js'); - + this.ao = new AudioIO({ inOptions: undefined, // output only outOptions: { @@ -232,26 +246,26 @@ class ConsoleAudioOutput extends AudioOutput { sampleRate: this.outputSampleRate, }, }); - + this.ao.start(); this.started = true; this._logger.info('Using native audio output via command-line tools'); } catch (error) { // Fallback to mock audio output this._logger.warn('Native audio failed, using mock audio output', error); - + this.ao = { write: (data: Buffer) => { const frameCount = data.length / (2 * this.numChannels); const durationMs = (frameCount / this.outputSampleRate) * 1000; - + setTimeout(() => { this.emit('playbackFinished'); }, durationMs); }, end: () => {}, }; - + this.started = true; } } @@ -294,7 +308,10 @@ class ConsoleAudioOutput extends AudioOutput { this.dispatchTimer = null; } const played = Math.min((Date.now() - this.captureStart) / 1000, this.pushedDuration); - this.onPlaybackFinished({ playbackPosition: played, interrupted: played + 1.0 < this.pushedDuration }); + this.onPlaybackFinished({ + playbackPosition: played, + interrupted: played + 1.0 < this.pushedDuration, + }); this.pushedDuration = 0; this.captureStart = 0; } @@ -322,14 +339,16 @@ export class ChatCLI extends EventEmitter { private currentAudioLine: string = ''; private isLogging: boolean = false; - - constructor(agentSession: AgentSession, { syncTranscription = true }: { syncTranscription?: boolean } = {}) { + constructor( + agentSession: AgentSession, + { syncTranscription = true }: { syncTranscription?: boolean } = {}, + ) { super(); this.session = agentSession; this.textSink = new StdoutTextOutput(); this.audioSink = new ConsoleAudioOutput(); this.inputAudio = new ConsoleAudioInput(); - + if (syncTranscription) { this.transcriptSyncer = new TranscriptionSynchronizer(this.audioSink, this.textSink); } @@ -450,7 +469,9 @@ export class ChatCLI extends EventEmitter { private updateSpeaker(enable: boolean) { if (enable) { - this.session.output.audio = this.transcriptSyncer ? this.transcriptSyncer.audioOutput : this.audioSink; + this.session.output.audio = this.transcriptSyncer + ? this.transcriptSyncer.audioOutput + : this.audioSink; } else { this.session.output.audio = null; } @@ -458,7 +479,9 @@ export class ChatCLI extends EventEmitter { private updateTextOutput({ enable, stdoutEnable }: { enable: boolean; stdoutEnable: boolean }) { if (enable) { - this.session.output.transcription = this.transcriptSyncer ? this.transcriptSyncer.textOutput : this.textSink; + this.session.output.transcription = this.transcriptSyncer + ? this.transcriptSyncer.textOutput + : this.textSink; this.textSink.setEnabled(stdoutEnable); } else { this.session.output.transcription = null; diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index 11c5b1c73..8acccba6f 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -204,10 +204,10 @@ export class AgentInput { if (this._audioStream && this._audioEnabled) { this._audioStream.onDetached(); } - + this._audioStream = stream; this.audioChanged(); - + // Attach new stream if enabled if (this._audioStream && this._audioEnabled) { this._audioStream.onAttached(); diff --git a/agents/src/voice/native_audio.ts b/agents/src/voice/native_audio.ts index 7504c37bd..4209c18ab 100644 --- a/agents/src/voice/native_audio.ts +++ b/agents/src/voice/native_audio.ts @@ -1,7 +1,10 @@ -import { Readable, Writable, Duplex, Transform } from 'stream'; -import { spawn, ChildProcess } from 'child_process'; +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type ChildProcess, spawn } from 'child_process'; import { EventEmitter } from 'events'; import * as os from 'os'; +import { Duplex, Readable, Writable } from 'stream'; export const SampleFormat8Bit = 8; export const SampleFormat16Bit = 16; @@ -50,9 +53,9 @@ class AudioInputStream extends Readable { private startTime: number = 0; constructor(options: AudioOptions) { - super({ + super({ highWaterMark: options.highwaterMark || 16384, - objectMode: false + objectMode: false, }); this.options = { sampleRate: 44100, @@ -60,7 +63,7 @@ class AudioInputStream extends Readable { sampleFormat: SampleFormat16Bit, deviceId: -1, closeOnError: true, - ...options + ...options, }; } @@ -78,46 +81,81 @@ class AudioInputStream extends Readable { try { if (platform === 'darwin') { - this.process = spawn('sox', [ - '-d', - '-r', String(sampleRate), - '-c', String(channelCount), - '-b', bitDepth, - '-e', encoding, - '-t', 'raw', - '-' - ], { - stdio: ['ignore', 'pipe', 'ignore'] - }); + this.process = spawn( + 'sox', + [ + '-d', + '-r', + String(sampleRate), + '-c', + String(channelCount), + '-b', + bitDepth, + '-e', + encoding, + '-t', + 'raw', + '-', + ], + { + stdio: ['ignore', 'pipe', 'ignore'], + }, + ); } else if (platform === 'linux') { - const format = sampleFormat === SampleFormat16Bit ? 'S16_LE' : - sampleFormat === SampleFormat32Bit ? 'S32_LE' : 'S16_LE'; - - this.process = spawn('arecord', [ - '-f', format, - '-r', String(sampleRate), - '-c', String(channelCount), - '-t', 'raw', - '-q', - '-' - ], { - stdio: ['ignore', 'pipe', 'ignore'] - }); + const format = + sampleFormat === SampleFormat16Bit + ? 'S16_LE' + : sampleFormat === SampleFormat32Bit + ? 'S32_LE' + : 'S16_LE'; + + this.process = spawn( + 'arecord', + [ + '-f', + format, + '-r', + String(sampleRate), + '-c', + String(channelCount), + '-t', + 'raw', + '-q', + '-', + ], + { + stdio: ['ignore', 'pipe', 'ignore'], + }, + ); } else if (platform === 'win32') { - const format = sampleFormat === SampleFormat16Bit ? 's16le' : - sampleFormat === SampleFormat32Bit ? 's32le' : - sampleFormat === SampleFormatFloat32 ? 'f32le' : 's16le'; - - this.process = spawn('ffmpeg', [ - '-f', 'dshow', - '-i', 'audio="Microphone (Realtek Audio)"', - '-ar', String(sampleRate), - '-ac', String(channelCount), - '-f', format, - '-' - ], { - stdio: ['ignore', 'pipe', 'ignore'] - }); + const format = + sampleFormat === SampleFormat16Bit + ? 's16le' + : sampleFormat === SampleFormat32Bit + ? 's32le' + : sampleFormat === SampleFormatFloat32 + ? 'f32le' + : 's16le'; + + this.process = spawn( + 'ffmpeg', + [ + '-f', + 'dshow', + '-i', + 'audio="Microphone (Realtek Audio)"', + '-ar', + String(sampleRate), + '-ac', + String(channelCount), + '-f', + format, + '-', + ], + { + stdio: ['ignore', 'pipe', 'ignore'], + }, + ); } if (this.process && this.process.stdout) { @@ -125,15 +163,13 @@ class AudioInputStream extends Readable { const timestamp = (Date.now() - this.startTime) / 1000; (chunk as any).timestamp = timestamp; this.totalBytesRead += chunk.length; - - if (!this.push(chunk)) { this.process?.stdout?.pause(); } }); - this.process.stderr?.on('data', (data) => { + this.process.stderr?.on('data', (_data) => { // Ignore stderr output }); @@ -145,7 +181,7 @@ class AudioInputStream extends Readable { } }); - this.process.on('exit', (code, signal) => { + this.process.on('exit', (code, _signal) => { if (code !== 0 && code !== null) { const err = new Error(`Audio input process exited with code ${code}`); if (this.options.closeOnError) { @@ -201,10 +237,10 @@ class AudioOutputStream extends Writable { private totalBytesWritten = 0; constructor(options: AudioOptions) { - super({ + super({ highWaterMark: options.highwaterMark || 16384, objectMode: false, - decodeStrings: false + decodeStrings: false, }); this.options = { sampleRate: 44100, @@ -212,7 +248,7 @@ class AudioOutputStream extends Writable { sampleFormat: SampleFormat16Bit, deviceId: -1, closeOnError: true, - ...options + ...options, }; } @@ -229,45 +265,70 @@ class AudioOutputStream extends Writable { try { if (platform === 'darwin') { - this.process = spawn('sox', [ - '-r', String(sampleRate), - '-c', String(channelCount), - '-b', bitDepth, - '-e', encoding, - '-t', 'raw', - '-', - '-d' - ], { - stdio: ['pipe', 'ignore', 'ignore'] - }); + this.process = spawn( + 'sox', + [ + '-r', + String(sampleRate), + '-c', + String(channelCount), + '-b', + bitDepth, + '-e', + encoding, + '-t', + 'raw', + '-', + '-d', + ], + { + stdio: ['pipe', 'ignore', 'ignore'], + }, + ); } else if (platform === 'linux') { - const format = sampleFormat === SampleFormat16Bit ? 'S16_LE' : - sampleFormat === SampleFormat32Bit ? 'S32_LE' : 'S16_LE'; - - this.process = spawn('aplay', [ - '-f', format, - '-r', String(sampleRate), - '-c', String(channelCount), - '-t', 'raw', - '-q' - ], { - stdio: ['pipe', 'ignore', 'ignore'] - }); + const format = + sampleFormat === SampleFormat16Bit + ? 'S16_LE' + : sampleFormat === SampleFormat32Bit + ? 'S32_LE' + : 'S16_LE'; + + this.process = spawn( + 'aplay', + ['-f', format, '-r', String(sampleRate), '-c', String(channelCount), '-t', 'raw', '-q'], + { + stdio: ['pipe', 'ignore', 'ignore'], + }, + ); } else if (platform === 'win32') { - const format = sampleFormat === SampleFormat16Bit ? 's16le' : - sampleFormat === SampleFormat32Bit ? 's32le' : - sampleFormat === SampleFormatFloat32 ? 'f32le' : 's16le'; - - this.process = spawn('ffmpeg', [ - '-f', format, - '-ar', String(sampleRate), - '-ac', String(channelCount), - '-i', '-', - '-f', 'dsound', - 'default' - ], { - stdio: ['pipe', 'ignore', 'ignore'] - }); + const format = + sampleFormat === SampleFormat16Bit + ? 's16le' + : sampleFormat === SampleFormat32Bit + ? 's32le' + : sampleFormat === SampleFormatFloat32 + ? 'f32le' + : 's16le'; + + this.process = spawn( + 'ffmpeg', + [ + '-f', + format, + '-ar', + String(sampleRate), + '-ac', + String(channelCount), + '-i', + '-', + '-f', + 'dsound', + 'default', + ], + { + stdio: ['pipe', 'ignore', 'ignore'], + }, + ); } if (this.process) { @@ -348,13 +409,13 @@ class AudioDuplexStream extends Duplex { constructor(options: AudioIOOptions) { const inOpts = options.inOptions || {}; const outOpts = options.outOptions || {}; - + super({ allowHalfOpen: false, readableHighWaterMark: inOpts.highwaterMark || 16384, writableHighWaterMark: outOpts.highwaterMark || 16384, objectMode: false, - decodeStrings: false + decodeStrings: false, }); this.inputStream = new AudioInputStream(inOpts); @@ -479,7 +540,11 @@ export class AudioIO extends EventEmitter { return this; } - write(chunk: any, encoding?: BufferEncoding | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean { + write( + chunk: any, + encoding?: BufferEncoding | ((error?: Error | null) => void), + callback?: (error?: Error | null) => void, + ): boolean { if (this.stream instanceof Writable || this.stream instanceof Duplex) { if (typeof encoding === 'function') { return this.stream.write(chunk, encoding); @@ -531,7 +596,7 @@ export function getDevices(): Array { if (platform === 'darwin') { try { - const result = spawn('system_profiler', ['SPAudioDataType']); + const _result = spawn('system_profiler', ['SPAudioDataType']); devices.push({ id: 0, name: 'Built-in Microphone', @@ -542,7 +607,7 @@ export function getDevices(): Array { defaultLowOutputLatency: 0.01, defaultHighInputLatency: 0.012, defaultHighOutputLatency: 0.1, - hostAPIName: 'Core Audio' + hostAPIName: 'Core Audio', }); devices.push({ id: 1, @@ -554,7 +619,7 @@ export function getDevices(): Array { defaultLowOutputLatency: 0.002, defaultHighInputLatency: 0.1, defaultHighOutputLatency: 0.012, - hostAPIName: 'Core Audio' + hostAPIName: 'Core Audio', }); } catch (e) { // Fall through to defaults @@ -572,7 +637,7 @@ export function getDevices(): Array { defaultLowOutputLatency: 0.01, defaultHighInputLatency: 0.1, defaultHighOutputLatency: 0.1, - hostAPIName: 'Default' + hostAPIName: 'Default', }); devices.push({ id: -1, @@ -584,7 +649,7 @@ export function getDevices(): Array { defaultLowOutputLatency: 0.01, defaultHighInputLatency: 0.1, defaultHighOutputLatency: 0.1, - hostAPIName: 'Default' + hostAPIName: 'Default', }); } @@ -612,8 +677,8 @@ export function getHostAPIs(): any { type: hostAPIName, deviceCount: 2, defaultInput: 0, - defaultOutput: 1 - } - ] + defaultOutput: 1, + }, + ], }; -} \ No newline at end of file +} diff --git a/examples/src/basic_agent.ts b/examples/src/basic_agent.ts index cac80738a..3f658b2d5 100644 --- a/examples/src/basic_agent.ts +++ b/examples/src/basic_agent.ts @@ -29,7 +29,6 @@ export default defineAgent({ }); const vad = ctx.proc.userData.vad! as silero.VAD; - const session = new voice.AgentSession({ vad, stt: new deepgram.STT(), diff --git a/turbo.json b/turbo.json index 147df0d2d..2a46b4c52 100644 --- a/turbo.json +++ b/turbo.json @@ -33,7 +33,8 @@ "GOOGLE_GENAI_API_KEY", "GOOGLE_GENAI_USE_VERTEXAI", "GOOGLE_CLOUD_PROJECT", - "GOOGLE_CLOUD_LOCATION" + "GOOGLE_CLOUD_LOCATION", + "LIVEKIT_ENABLE_CONSOLE_MODE" ], "pipeline": { "build": {