Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { StreamSfuClient } from './StreamSfuClient';
import {
BasePeerConnectionOpts,
Dispatcher,
getGenericSdp,
isAudioTrackType,
Expand Down Expand Up @@ -118,6 +119,7 @@ import {
ClientDetails,
Codec,
ParticipantSource,
PeerType,
PublishOption,
SubscribeOption,
TrackType,
Expand Down Expand Up @@ -987,9 +989,11 @@ export class Call {
// prepare a generic SDP and send it to the SFU.
// these are throw-away SDPs that the SFU will use to determine
// the capabilities of the client (codec support, etc.)
const { dangerouslyForceCodec, fmtpLine, subscriberFmtpLine } =
this.clientPublishOptions || {};
const [subscriberSdp, publisherSdp] = await Promise.all([
getGenericSdp('recvonly'),
getGenericSdp('sendonly'),
getGenericSdp('recvonly', dangerouslyForceCodec, subscriberFmtpLine),
getGenericSdp('sendonly', dangerouslyForceCodec, fmtpLine),
]);
const isReconnecting =
this.reconnectStrategy !== WebsocketReconnectStrategy.UNSPECIFIED;
Expand Down Expand Up @@ -1237,20 +1241,23 @@ export class Call {
if (closePreviousInstances && this.subscriber) {
this.subscriber.dispose();
}
this.subscriber = new Subscriber({
const basePeerConnectionOptions: BasePeerConnectionOpts = {
sfuClient,
dispatcher: this.dispatcher,
state: this.state,
connectionConfig,
tag: sfuClient.tag,
enableTracing,
onReconnectionNeeded: (kind, reason) => {
clientPublishOptions: this.clientPublishOptions,
onReconnectionNeeded: (kind, reason, peerType) => {
this.reconnect(kind, reason).catch((err) => {
const message = `[Reconnect] Error reconnecting after a subscriber error: ${reason}`;
const message = `[Reconnect] Error reconnecting, after a ${PeerType[peerType]} error: ${reason}`;
this.logger.warn(message, err);
});
},
});
};

this.subscriber = new Subscriber(basePeerConnectionOptions);

// anonymous users can't publish anything hence, there is no need
// to create Publisher Peer Connection for them
Expand All @@ -1259,21 +1266,7 @@ export class Call {
if (closePreviousInstances && this.publisher) {
this.publisher.dispose();
}
this.publisher = new Publisher({
sfuClient,
dispatcher: this.dispatcher,
state: this.state,
connectionConfig,
publishOptions,
tag: sfuClient.tag,
enableTracing,
onReconnectionNeeded: (kind, reason) => {
this.reconnect(kind, reason).catch((err) => {
const message = `[Reconnect] Error reconnecting after a publisher error: ${reason}`;
this.logger.warn(message, err);
});
},
});
this.publisher = new Publisher(basePeerConnectionOptions, publishOptions);
}

this.statsReporter?.stop();
Expand Down
9 changes: 7 additions & 2 deletions packages/client/src/rtc/BasePeerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { StreamSfuClient } from '../StreamSfuClient';
import { AllSfuEvents, Dispatcher } from './Dispatcher';
import { withoutConcurrency } from '../helpers/concurrency';
import { StatsTracer, Tracer, traceRTCPeerConnection } from '../stats';
import { BasePeerConnectionOpts, OnReconnectionNeeded } from './types';
import type { BasePeerConnectionOpts, OnReconnectionNeeded } from './types';
import type { ClientPublishOptions } from '../types';

/**
* A base class for the `Publisher` and `Subscriber` classes.
Expand All @@ -25,6 +26,7 @@ export abstract class BasePeerConnection {
protected readonly pc: RTCPeerConnection;
protected readonly state: CallState;
protected readonly dispatcher: Dispatcher;
protected readonly clientPublishOptions?: ClientPublishOptions;
protected sfuClient: StreamSfuClient;

private onReconnectionNeeded?: OnReconnectionNeeded;
Expand Down Expand Up @@ -55,6 +57,7 @@ export abstract class BasePeerConnection {
onReconnectionNeeded,
tag,
enableTracing,
clientPublishOptions,
iceRestartDelay = 2500,
}: BasePeerConnectionOpts,
) {
Expand All @@ -63,6 +66,7 @@ export abstract class BasePeerConnection {
this.state = state;
this.dispatcher = dispatcher;
this.iceRestartDelay = iceRestartDelay;
this.clientPublishOptions = clientPublishOptions;
this.onReconnectionNeeded = onReconnectionNeeded;
this.logger = videoLoggerSystem.getLogger(
peerType === PeerType.SUBSCRIBER ? 'Subscriber' : 'Publisher',
Expand Down Expand Up @@ -145,7 +149,7 @@ export abstract class BasePeerConnection {
e.error.code === ErrorCode.PARTICIPANT_SIGNAL_LOST
? WebsocketReconnectStrategy.FAST
: WebsocketReconnectStrategy.REJOIN;
this.onReconnectionNeeded?.(strategy, reason);
this.onReconnectionNeeded?.(strategy, reason, this.peerType);
});
};

Expand Down Expand Up @@ -284,6 +288,7 @@ export abstract class BasePeerConnection {
this.onReconnectionNeeded?.(
WebsocketReconnectStrategy.REJOIN,
'Connection failed',
this.peerType,
);
return;
}
Expand Down
18 changes: 13 additions & 5 deletions packages/client/src/rtc/Publisher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BasePeerConnection } from './BasePeerConnection';
import {
import type {
BasePeerConnectionOpts,
PublishBundle,
PublisherConstructorOpts,
TrackPublishOptions,
} from './types';
import { NegotiationError } from './NegotiationError';
Expand All @@ -21,7 +21,7 @@ import {
} from './layers';
import { isSvcCodec } from './codecs';
import { isAudioTrackType } from './helpers/tracks';
import { extractMid } from './helpers/sdp';
import { extractMid, removeCodecsExcept } from './helpers/sdp';
import { withoutConcurrency } from '../helpers/concurrency';
import { isReactNative } from '../helpers/platforms';

Expand All @@ -38,7 +38,10 @@ export class Publisher extends BasePeerConnection {
/**
* Constructs a new `Publisher` instance.
*/
constructor({ publishOptions, ...baseOptions }: PublisherConstructorOpts) {
constructor(
baseOptions: BasePeerConnectionOpts,
publishOptions: PublishOption[],
) {
super(PeerType.PUBLISHER_UNSPECIFIED, baseOptions);
this.publishOptions = publishOptions;

Expand Down Expand Up @@ -378,7 +381,12 @@ export class Publisher extends BasePeerConnection {
this.isIceRestarting = options?.iceRestart ?? false;
await this.pc.setLocalDescription(offer);

const { sdp = '' } = offer;
const { sdp: baseSdp = '' } = offer;
const { dangerouslyForceCodec, fmtpLine } =
this.clientPublishOptions || {};
const sdp = dangerouslyForceCodec
? removeCodecsExcept(baseSdp, dangerouslyForceCodec, fmtpLine)
: baseSdp;
const { response } = await this.sfuClient.setPublisher({ sdp, tracks });
if (response.error) throw new NegotiationError(response.error);

Expand Down
11 changes: 10 additions & 1 deletion packages/client/src/rtc/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { NegotiationError } from './NegotiationError';
import { PeerType } from '../gen/video/sfu/models/models';
import { SubscriberOffer } from '../gen/video/sfu/event/events';
import { toTrackType, trackTypeToParticipantStreamKey } from './helpers/tracks';
import { enableStereo } from './helpers/sdp';
import { enableStereo, removeCodecsExcept } from './helpers/sdp';

/**
* A wrapper around the `RTCPeerConnection` that handles the incoming
Expand Down Expand Up @@ -153,6 +153,15 @@ export class Subscriber extends BasePeerConnection {
const answer = await this.pc.createAnswer();
if (answer.sdp) {
answer.sdp = enableStereo(subscriberOffer.sdp, answer.sdp);
const { dangerouslyForceCodec, subscriberFmtpLine } =
this.clientPublishOptions || {};
if (dangerouslyForceCodec) {
answer.sdp = removeCodecsExcept(
answer.sdp,
dangerouslyForceCodec,
subscriberFmtpLine,
);
}
}
await this.pc.setLocalDescription(answer);

Expand Down
20 changes: 12 additions & 8 deletions packages/client/src/rtc/__tests__/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ describe('Publisher', () => {
sfuClient['sessionId'] = sessionId;

state = new CallState();
publisher = new Publisher({
sfuClient,
dispatcher,
state,
tag: 'test',
enableTracing: false,
publishOptions: [
publisher = new Publisher(
{
sfuClient,
dispatcher,
state,
tag: 'test',
enableTracing: false,
},
[
{
id: 1,
trackType: TrackType.VIDEO,
Expand All @@ -81,7 +83,7 @@ describe('Publisher', () => {
maxSpatialLayers: 3,
},
],
});
);
});

afterEach(() => {
Expand Down Expand Up @@ -309,6 +311,7 @@ describe('Publisher', () => {
expect(publisher['onReconnectionNeeded']).toHaveBeenCalledWith(
WebsocketReconnectStrategy.FAST,
anyString(),
PeerType.PUBLISHER_UNSPECIFIED,
);

expect(pc.setLocalDescription).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -354,6 +357,7 @@ describe('Publisher', () => {
expect(publisher['onReconnectionNeeded']).toHaveBeenCalledWith(
WebsocketReconnectStrategy.REJOIN,
anyString(),
PeerType.PUBLISHER_UNSPECIFIED,
);
});

Expand Down
15 changes: 13 additions & 2 deletions packages/client/src/rtc/codecs.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import { removeCodecsExcept } from './helpers/sdp';

/**
* Returns a generic SDP for the given direction.
* We use this SDP to send it as part of our JoinRequest so that the SFU
* can use it to determine the client's codec capabilities.
*
* @param direction the direction of the transceiver.
* @param codecToKeep the codec mime type to keep (video/h264 or audio/opus).
* @param fmtpProfileToKeep optional fmtp profile to keep.
*/
export const getGenericSdp = async (direction: RTCRtpTransceiverDirection) => {
export const getGenericSdp = async (
direction: RTCRtpTransceiverDirection,
codecToKeep: string | undefined,
fmtpProfileToKeep: string | undefined,
) => {
const tempPc = new RTCPeerConnection();
tempPc.addTransceiver('video', { direction });
tempPc.addTransceiver('audio', { direction });

const offer = await tempPc.createOffer();
const sdp = offer.sdp ?? '';
const { sdp: baseSdp = '' } = offer;
const sdp = codecToKeep
? removeCodecsExcept(baseSdp, codecToKeep, fmtpProfileToKeep)
: baseSdp;

tempPc.getTransceivers().forEach((t) => {
t.stop?.();
Expand Down
Loading
Loading