Skip to content

Commit

Permalink
make AudioStream use AsyncIterableIterator
Browse files Browse the repository at this point in the history
instead of having a typed EventEmitter
  • Loading branch information
nbsp committed Sep 19, 2024
1 parent 39d29a2 commit aadbf24
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
44 changes: 27 additions & 17 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,28 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import type { AudioStreamInfo, NewAudioStreamResponse } from './proto/audio_frame_pb.js';
import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js';
import type { Track } from './track.js';

export type AudioFrameEvent = {
frame: AudioFrame;
};

export type AudioStreamCallbacks = {
frameReceived: (frame: AudioFrameEvent) => void;
};

export enum AudioStreamEvent {
FrameReceived = 'frameReceived',
}

export class AudioStream extends (EventEmitter as new () => TypedEmitter<AudioStreamCallbacks>) {
export class AudioStream implements AsyncIterableIterator<AudioFrame> {
/** @internal */
info: AudioStreamInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
eventQueue: (AudioFrame | null)[] = [];
/** @internal */
queueResolve: ((value: IteratorResult<AudioFrame>) => void) | null = null;

track: Track;
sampleRate: number;
numChannels: number;

constructor(track: Track, sampleRate: number = 48000, numChannels: number = 1) {
super();
this.track = track;
this.sampleRate = sampleRate;
this.numChannels = numChannels;
Expand Down Expand Up @@ -70,15 +59,36 @@ export class AudioStream extends (EventEmitter as new () => TypedEmitter<AudioSt
switch (streamEvent.case) {
case 'frameReceived':
const frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame);
this.emit(AudioStreamEvent.FrameReceived, { frame });
if (this.queueResolve) {
this.queueResolve({ done: false, value: frame });
} else {
this.eventQueue.push(frame);
}
break;
case 'eos':
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
break;
}
};

async next(): Promise<IteratorResult<AudioFrame>> {
if (this.eventQueue.length > 0) {
const value = this.eventQueue.shift();
if (value) {
return { done: false, value };
} else {
return { done: true, value: undefined };
}
}
return new Promise((resolve) => (this.queueResolve = resolve));
}

close() {
this.eventQueue.push(null);
this.ffiHandle.dispose();
}

[Symbol.asyncIterator](): AudioStream {
return this;
}
}
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export {
} from './track.js';
export { VideoFrame } from './video_frame.js';
export { AudioFrame } from './audio_frame.js';
export { AudioStream, AudioStreamEvent, AudioFrameEvent } from './audio_stream.js';
export { AudioStream } from './audio_stream.js';
export { VideoStream, VideoStreamEvent, VideoFrameEvent } from './video_stream.js';
export { AudioSource } from './audio_source.js';
export { VideoSource } from './video_source.js';
Expand Down

0 comments on commit aadbf24

Please sign in to comment.