Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{Audio,Video}Stream: use AsyncIterableIterator #272

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/real-toes-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": minor
---

make AudioStream use AsyncIterableIterator instead of typed EventEmitter
44 changes: 27 additions & 17 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,28 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import EventEmitter from 'events';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import type { AudioStreamInfo, NewAudioStreamResponse } from './proto/audio_frame_pb.js';
import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js';
import type { Track } from './track.js';

export type AudioFrameEvent = {
frame: AudioFrame;
};

export type AudioStreamCallbacks = {
frameReceived: (frame: AudioFrameEvent) => void;
};

export enum AudioStreamEvent {
FrameReceived = 'frameReceived',
}

export class AudioStream extends (EventEmitter as new () => TypedEmitter<AudioStreamCallbacks>) {
export class AudioStream implements AsyncIterableIterator<AudioFrame> {
/** @internal */
info: AudioStreamInfo;
/** @internal */
ffiHandle: FfiHandle;
/** @internal */
eventQueue: (AudioFrame | null)[] = [];
/** @internal */
queueResolve: ((value: IteratorResult<AudioFrame>) => void) | null = null;

track: Track;
sampleRate: number;
numChannels: number;

constructor(track: Track, sampleRate: number = 48000, numChannels: number = 1) {
super();
this.track = track;
this.sampleRate = sampleRate;
this.numChannels = numChannels;
Expand Down Expand Up @@ -70,15 +59,36 @@ export class AudioStream extends (EventEmitter as new () => TypedEmitter<AudioSt
switch (streamEvent.case) {
case 'frameReceived':
const frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame);
this.emit(AudioStreamEvent.FrameReceived, { frame });
if (this.queueResolve) {
this.queueResolve({ done: false, value: frame });
} else {
this.eventQueue.push(frame);
}
break;
case 'eos':
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
break;
}
};

async next(): Promise<IteratorResult<AudioFrame>> {
if (this.eventQueue.length > 0) {
const value = this.eventQueue.shift();
if (value) {
return { done: false, value };
} else {
return { done: true, value: undefined };
}
}
return new Promise((resolve) => (this.queueResolve = resolve));
Copy link
Contributor

@lukasIO lukasIO Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this.queueResolve might get overridden before being called?
so the async iterator could get stuck

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Mutex helper that Théo has mentioned already might make sense here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could it? i don't think that's the case, it only gets overridden if you call next() twice without awaiting the first one, which you shouldn't do anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which you shouldn't do anyway.

agreed that it shouldn't be done, but seems like a footgun regardless?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to return a copy of the same promise if next is called twice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I think best case scenario is a promise chain, which is exactly what the Mutex class does.

}

close() {
this.eventQueue.push(null);
this.ffiHandle.dispose();
}

[Symbol.asyncIterator](): AudioStream {
return this;
}
}
2 changes: 1 addition & 1 deletion packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export {
} from './track.js';
export { VideoFrame } from './video_frame.js';
export { AudioFrame } from './audio_frame.js';
export { AudioStream, AudioStreamEvent, AudioFrameEvent } from './audio_stream.js';
export { AudioStream } from './audio_stream.js';
export { VideoStream, VideoStreamEvent, VideoFrameEvent } from './video_stream.js';
export { AudioSource } from './audio_source.js';
export { VideoSource } from './video_source.js';
Expand Down
Loading