Skip to content

Commit

Permalink
modify some data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
kota-yata committed Sep 23, 2024
1 parent 6ffcb74 commit 2f8105b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 43 deletions.
45 changes: 7 additions & 38 deletions src/lib/moq/moqt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ export class MOQT {
private senderState: SenderState = {};
private inflightRequests: string[] = [];
public trackManager: TrackManager;
constructor(url: string) {
this.wt = new WebTransport(url, { congestionControl: 'throughput' });
constructor(props: { url: string, maxInflightRequests?: number }) {
this.wt = new WebTransport(props.url, { congestionControl: 'throughput' }); // 'throughput' or 'low-latency' although only Firefox supports 'low-latency'
this.trackManager = new TrackManager();
if (props.maxInflightRequests) this.MAX_INFLIGHT_REQUESTS = props.maxInflightRequests;
}
public async initControlStream() {
await this.wt.ready;
Expand All @@ -31,43 +32,11 @@ export class MOQT {
}

public getIncomingStream(): ReadableStream { return this.wt.incomingUnidirectionalStreams; }
public getControlWriter(): WritableStream { return this.controlWriter; }
public getControlReader(): ReadableStream { return this.controlReader; }
private async send(props: { writerStream: WritableStream, dataBytes: Uint8Array }) {
const writer = props.writerStream.getWriter();
await writer.write(props.dataBytes);
writer.releaseLock();
}
// Start as a subscriber
public async startSubscriber(props: { namespace: string, videoTrackName: string, audioTrackName: string, secret: string }) {
await this.setup({ role: MOQ_PARAMETER_ROLE.SUBSCRIBER });
await this.readSetup();

await this.subscribe({
subscribeId: 0,
namespace: props.namespace,
trackName: props.videoTrackName,
authInfo: props.secret
});
const typeVideo = await varIntToNumber(this.controlReader);
if (typeVideo === MOQ_MESSAGE.SUBSCRIBE_ERROR) {
const error = await this.readSubscribeError();
throw new Error(`SUBSCRIBE error: ${error.errorCode} ${error.reasonPhrase}`);
} else if (typeVideo !== MOQ_MESSAGE.SUBSCRIBE_OK) {
throw new Error(`Unhandlable SUBSCRIBE response type: ${typeVideo}`);
}
const subscribeResponseVideo = await this.readSubscribeResponse();
this.trackManager.addTrack({
namespace: props.namespace,
name: props.videoTrackName,
subscribeIds: [subscribeResponseVideo.subscribeId],
type: 'video',
priority: 2,
});
}
public async stopSubscriber() {
await this.unsubscribe(0); // TODO: unsubscribe all. also manage subscribe ids
}
// read message type
public async readControlMessageType(): Promise<number> {
return await varIntToNumber(this.controlReader);
Expand Down Expand Up @@ -192,24 +161,24 @@ export class MOQT {
return ret;
}
private generateSubscribeUpdateMessage() {}
private async readSubscribeError() {
public async readSubscribeError() {
const subscribeId = await varIntToNumber(this.controlReader);
const errorCode = await varIntToNumber(this.controlReader);
const reasonPhrase = await toString(this.controlReader);
const trackAlias = await varIntToNumber(this.controlReader);
return { subscribeId, errorCode, reasonPhrase, trackAlias };
}
private readSubscribeDone() {}
public readSubscribeDone() {}
private generateUnsubscribeMessage(subscribeId: number) {
const messageTypeBytes = numberToVarInt(MOQ_MESSAGE.UNSUBSCRIBE);
const subscribeIdBytes = numberToVarInt(subscribeId);
return concatBuffer([messageTypeBytes, subscribeIdBytes]);
}
private async unsubscribe(subscribeId: number) {
public async unsubscribe(subscribeId: number) {
const unsubscribeMessage = this.generateUnsubscribeMessage(subscribeId);
await this.send({ writerStream: this.controlWriter, dataBytes: unsubscribeMessage });
}
public async readUnsubscribe() {
public async readUnsubscribe () {
const subscribeId = await varIntToNumber(this.controlReader);
return { subscribeId };
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/moq/publish/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class Publisher {
audioChunkCount: number;
private moqt: MOQT;
constructor(url: string) {
this.moqt = new MOQT(url);
this.moqt = new MOQT({ url });
this.videoChunkCount = 0;
this.audioChunkCount = 0;
}
Expand Down
34 changes: 30 additions & 4 deletions src/lib/moq/subscribe/subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AUDIO_DECODER_DEFAULT_CONFIG, VIDEO_DECODER_DEFAULT_CONFIG } from '../constants';
import { AUDIO_DECODER_DEFAULT_CONFIG, MOQ_MESSAGE, MOQ_PARAMETER_ROLE, VIDEO_DECODER_DEFAULT_CONFIG } from '../constants';
import { LOC } from '../loc';
import { MitterMuffer } from '../mitter-muffer';
import { MOQT } from '../moqt';
Expand All @@ -18,7 +18,7 @@ export class Subscriber {
private videoJitterBuffer: MitterMuffer;
private audioJitterBuffer: MitterMuffer;
constructor(url: string) {
this.moqt = new MOQT(url);
this.moqt = new MOQT({ url });
this.vDecoder = new VideoDecoder({
output: (frame) => this.handleVideoFrame(frame),
error: (error: DOMException) => this.mogger.error(error.message)
Expand All @@ -33,14 +33,40 @@ export class Subscriber {
}
public async init(props: { namespace: string, videoTrackName: string, audioTrackName: string, authInfo: string, jitterBufferFrameSize: number }) {
await this.moqt.initControlStream();
await this.moqt.startSubscriber({ ...props, secret: props.authInfo });
await this.startSubscriber({ ...props, secret: props.authInfo });
this.startLoopObject();
this.videoJitterBuffer = new MitterMuffer(props.jitterBufferFrameSize);
this.audioJitterBuffer = new MitterMuffer(props.jitterBufferFrameSize);
}
public async startSubscriber(props: { namespace: string, videoTrackName: string, audioTrackName: string, secret: string }) {
await this.moqt.setup({ role: MOQ_PARAMETER_ROLE.SUBSCRIBER });
await this.moqt.readSetup();

await this.moqt.subscribe({
subscribeId: 0,
namespace: props.namespace,
trackName: props.videoTrackName,
authInfo: props.secret
});
const typeVideo = await this.moqt.readControlMessageType();
if (typeVideo === MOQ_MESSAGE.SUBSCRIBE_ERROR) {
const error = await this.moqt.readSubscribeError();
throw new Error(`SUBSCRIBE error: ${error.errorCode} ${error.reasonPhrase}`);
} else if (typeVideo !== MOQ_MESSAGE.SUBSCRIBE_OK) {
throw new Error(`Unhandlable SUBSCRIBE response type: ${typeVideo}`);
}
const subscribeResponseVideo = await this.moqt.readSubscribeResponse();
this.moqt.trackManager.addTrack({
namespace: props.namespace,
name: props.videoTrackName,
subscribeIds: [subscribeResponseVideo.subscribeId],
type: 'video',
priority: 2,
});
}
public async stop() {
// unsubscribe but keep the control stream
await this.moqt.stopSubscriber();
await this.moqt.unsubscribe(0); // TODO: unsubscribe all. also manage subscribe ids
}
public setCanvasElement(canvasElement: HTMLCanvasElement) {
this.canvasElement = canvasElement;
Expand Down

0 comments on commit 2f8105b

Please sign in to comment.