From 2f8105bfe849720b4b0b6a340982a092699eed89 Mon Sep 17 00:00:00 2001 From: kota-yata Date: Mon, 23 Sep 2024 15:10:21 -0700 Subject: [PATCH] modify some data structure --- src/lib/moq/moqt.ts | 45 +++++------------------------ src/lib/moq/publish/publisher.ts | 2 +- src/lib/moq/subscribe/subscriber.ts | 34 +++++++++++++++++++--- 3 files changed, 38 insertions(+), 43 deletions(-) diff --git a/src/lib/moq/moqt.ts b/src/lib/moq/moqt.ts index ebaf2ec..69db648 100644 --- a/src/lib/moq/moqt.ts +++ b/src/lib/moq/moqt.ts @@ -19,9 +19,10 @@ export class MOQT { private senderState: SenderState = {}; private inflightRequests: string[] = []; public trackManager: TrackManager; - constructor(url: string) { - this.wt = new WebTransport(url, { congestionControl: 'throughput' }); + constructor(props: { url: string, maxInflightRequests?: number }) { + this.wt = new WebTransport(props.url, { congestionControl: 'throughput' }); // 'throughput' or 'low-latency' although only Firefox supports 'low-latency' this.trackManager = new TrackManager(); + if (props.maxInflightRequests) this.MAX_INFLIGHT_REQUESTS = props.maxInflightRequests; } public async initControlStream() { await this.wt.ready; @@ -31,43 +32,11 @@ export class MOQT { } public getIncomingStream(): ReadableStream { return this.wt.incomingUnidirectionalStreams; } - public getControlWriter(): WritableStream { return this.controlWriter; } - public getControlReader(): ReadableStream { return this.controlReader; } private async send(props: { writerStream: WritableStream, dataBytes: Uint8Array }) { const writer = props.writerStream.getWriter(); await writer.write(props.dataBytes); writer.releaseLock(); } - // Start as a subscriber - public async startSubscriber(props: { namespace: string, videoTrackName: string, audioTrackName: string, secret: string }) { - await this.setup({ role: MOQ_PARAMETER_ROLE.SUBSCRIBER }); - await this.readSetup(); - - await this.subscribe({ - subscribeId: 0, - namespace: props.namespace, - trackName: props.videoTrackName, - authInfo: props.secret - }); - const typeVideo = await varIntToNumber(this.controlReader); - if (typeVideo === MOQ_MESSAGE.SUBSCRIBE_ERROR) { - const error = await this.readSubscribeError(); - throw new Error(`SUBSCRIBE error: ${error.errorCode} ${error.reasonPhrase}`); - } else if (typeVideo !== MOQ_MESSAGE.SUBSCRIBE_OK) { - throw new Error(`Unhandlable SUBSCRIBE response type: ${typeVideo}`); - } - const subscribeResponseVideo = await this.readSubscribeResponse(); - this.trackManager.addTrack({ - namespace: props.namespace, - name: props.videoTrackName, - subscribeIds: [subscribeResponseVideo.subscribeId], - type: 'video', - priority: 2, - }); - } - public async stopSubscriber() { - await this.unsubscribe(0); // TODO: unsubscribe all. also manage subscribe ids - } // read message type public async readControlMessageType(): Promise { return await varIntToNumber(this.controlReader); @@ -192,24 +161,24 @@ export class MOQT { return ret; } private generateSubscribeUpdateMessage() {} - private async readSubscribeError() { + public async readSubscribeError() { const subscribeId = await varIntToNumber(this.controlReader); const errorCode = await varIntToNumber(this.controlReader); const reasonPhrase = await toString(this.controlReader); const trackAlias = await varIntToNumber(this.controlReader); return { subscribeId, errorCode, reasonPhrase, trackAlias }; } - private readSubscribeDone() {} + public readSubscribeDone() {} private generateUnsubscribeMessage(subscribeId: number) { const messageTypeBytes = numberToVarInt(MOQ_MESSAGE.UNSUBSCRIBE); const subscribeIdBytes = numberToVarInt(subscribeId); return concatBuffer([messageTypeBytes, subscribeIdBytes]); } - private async unsubscribe(subscribeId: number) { + public async unsubscribe(subscribeId: number) { const unsubscribeMessage = this.generateUnsubscribeMessage(subscribeId); await this.send({ writerStream: this.controlWriter, dataBytes: unsubscribeMessage }); } - public async readUnsubscribe() { + public async readUnsubscribe () { const subscribeId = await varIntToNumber(this.controlReader); return { subscribeId }; } diff --git a/src/lib/moq/publish/publisher.ts b/src/lib/moq/publish/publisher.ts index 8ade49c..2a82d81 100644 --- a/src/lib/moq/publish/publisher.ts +++ b/src/lib/moq/publish/publisher.ts @@ -32,7 +32,7 @@ export class Publisher { audioChunkCount: number; private moqt: MOQT; constructor(url: string) { - this.moqt = new MOQT(url); + this.moqt = new MOQT({ url }); this.videoChunkCount = 0; this.audioChunkCount = 0; } diff --git a/src/lib/moq/subscribe/subscriber.ts b/src/lib/moq/subscribe/subscriber.ts index a21b486..e1978b6 100644 --- a/src/lib/moq/subscribe/subscriber.ts +++ b/src/lib/moq/subscribe/subscriber.ts @@ -1,4 +1,4 @@ -import { AUDIO_DECODER_DEFAULT_CONFIG, VIDEO_DECODER_DEFAULT_CONFIG } from '../constants'; +import { AUDIO_DECODER_DEFAULT_CONFIG, MOQ_MESSAGE, MOQ_PARAMETER_ROLE, VIDEO_DECODER_DEFAULT_CONFIG } from '../constants'; import { LOC } from '../loc'; import { MitterMuffer } from '../mitter-muffer'; import { MOQT } from '../moqt'; @@ -18,7 +18,7 @@ export class Subscriber { private videoJitterBuffer: MitterMuffer; private audioJitterBuffer: MitterMuffer; constructor(url: string) { - this.moqt = new MOQT(url); + this.moqt = new MOQT({ url }); this.vDecoder = new VideoDecoder({ output: (frame) => this.handleVideoFrame(frame), error: (error: DOMException) => this.mogger.error(error.message) @@ -33,14 +33,40 @@ export class Subscriber { } public async init(props: { namespace: string, videoTrackName: string, audioTrackName: string, authInfo: string, jitterBufferFrameSize: number }) { await this.moqt.initControlStream(); - await this.moqt.startSubscriber({ ...props, secret: props.authInfo }); + await this.startSubscriber({ ...props, secret: props.authInfo }); this.startLoopObject(); this.videoJitterBuffer = new MitterMuffer(props.jitterBufferFrameSize); this.audioJitterBuffer = new MitterMuffer(props.jitterBufferFrameSize); } + public async startSubscriber(props: { namespace: string, videoTrackName: string, audioTrackName: string, secret: string }) { + await this.moqt.setup({ role: MOQ_PARAMETER_ROLE.SUBSCRIBER }); + await this.moqt.readSetup(); + + await this.moqt.subscribe({ + subscribeId: 0, + namespace: props.namespace, + trackName: props.videoTrackName, + authInfo: props.secret + }); + const typeVideo = await this.moqt.readControlMessageType(); + if (typeVideo === MOQ_MESSAGE.SUBSCRIBE_ERROR) { + const error = await this.moqt.readSubscribeError(); + throw new Error(`SUBSCRIBE error: ${error.errorCode} ${error.reasonPhrase}`); + } else if (typeVideo !== MOQ_MESSAGE.SUBSCRIBE_OK) { + throw new Error(`Unhandlable SUBSCRIBE response type: ${typeVideo}`); + } + const subscribeResponseVideo = await this.moqt.readSubscribeResponse(); + this.moqt.trackManager.addTrack({ + namespace: props.namespace, + name: props.videoTrackName, + subscribeIds: [subscribeResponseVideo.subscribeId], + type: 'video', + priority: 2, + }); + } public async stop() { // unsubscribe but keep the control stream - await this.moqt.stopSubscriber(); + await this.moqt.unsubscribe(0); // TODO: unsubscribe all. also manage subscribe ids } public setCanvasElement(canvasElement: HTMLCanvasElement) { this.canvasElement = canvasElement;