Skip to content

Commit

Permalink
DC messages (#40)
Browse files Browse the repository at this point in the history
* Implement data channel message type

* Better logging

* Update

* Ignore jest setup files

* [MM-60561] RTC client metrics (#41)

* Client metrics

* Improve loss detection in case of senders of loss
  • Loading branch information
streamer45 authored Oct 9, 2024
1 parent 93c0c8b commit 2ad11b3
Show file tree
Hide file tree
Showing 24 changed files with 424 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
run: |
mkdir -p tmp
npx tsc --outDir tmp
diff -r lib tmp
diff -x 'setup_jest*' -r lib tmp
rm -rf tmp
test:
runs-on: ubuntu-22.04
Expand Down
7 changes: 7 additions & 0 deletions lib/dc_msg.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Encoder, Decoder } from '@msgpack/msgpack';
import { DCMessageType } from './types';
export declare function encodeDCMsg(enc: Encoder, msgType: DCMessageType, payload?: any): Uint8Array;
export declare function decodeDCMsg(dec: Decoder, data: Uint8Array): {
mt: unknown;
payload: unknown;
};
41 changes: 41 additions & 0 deletions lib/dc_msg.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { zlibSync, unzlibSync, strToU8, strFromU8 } from 'fflate';
import { DCMessageType } from './types';
export function encodeDCMsg(enc, msgType, payload) {
const mt = enc.encode(msgType);
if (typeof payload === 'undefined') {
return mt;
}
let pl;
if (msgType === DCMessageType.SDP) {
pl = enc.encode(zlibSync(strToU8(JSON.stringify(payload))));
}
else {
pl = enc.encode(payload);
}
// Flat encoding
const msg = new Uint8Array(mt.byteLength + pl.byteLength);
msg.set(mt);
msg.set(pl, mt.byteLength);
return msg;
}
export function decodeDCMsg(dec, data) {
let mt;
let payload;
let i = 0;
// Messages are expected to be flat (no surrounding object).
// We also support payload-less messages (e.g. ping/pong).
for (const val of dec.decodeMulti(data)) {
if (i === 0) {
mt = val;
}
else if (i === 1) {
payload = val;
break;
}
i++;
}
if (mt === DCMessageType.SDP) {
payload = strFromU8(unzlibSync(payload));
}
return { mt, payload };
}
34 changes: 29 additions & 5 deletions lib/rtc_monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge
});
};
import { EventEmitter } from 'events';
import { newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCCandidatePairStats } from './rtc_stats';
import { newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCRemoteOutboundStats, newRTCCandidatePairStats } from './rtc_stats';
export const mosThreshold = 3.5;
export class RTCMonitor extends EventEmitter {
constructor(cfg) {
Expand All @@ -28,6 +28,7 @@ export class RTCMonitor extends EventEmitter {
lastLocalIn: {},
lastLocalOut: {},
lastRemoteIn: {},
lastRemoteOut: {},
};
}
start() {
Expand All @@ -37,7 +38,7 @@ export class RTCMonitor extends EventEmitter {
this.logger.logDebug('RTCMonitor: starting');
this.intervalID = setInterval(this.gatherStats, this.cfg.monitorInterval);
}
getLocalInQualityStats(localIn) {
getLocalInQualityStats(localIn, remoteOut) {
const stats = {};
let totalTime = 0;
let totalPacketsReceived = 0;
Expand All @@ -53,7 +54,23 @@ export class RTCMonitor extends EventEmitter {
}
const tsDiff = stat.timestamp - this.stats.lastLocalIn[ssrc].timestamp;
const receivedDiff = stat.packetsReceived - this.stats.lastLocalIn[ssrc].packetsReceived;
const lostDiff = stat.packetsLost - this.stats.lastLocalIn[ssrc].packetsLost;
// Tracking loss on the receiving end is a bit more tricky because packets are
// forwarded without much modification by the server so if the sender is having issues, these are
// propagated to the receiver side which may believe it's having problems as a consequence.
//
// What we want to know instead is whether the local side is having issues on the
// server -> receiver path rather than sender -> server -> receiver one.
// To do this we check for any mismatches in packets sent by the remote and packets
// received by us.
//
// Note: it's expected for local.packetsReceived to be slightly higher than remote.packetsSent
// since reports are generated at different times, with the local one likely being more time-accurate.
//
// Having remote.packetsSent higher than local.packetsReceived is instead a fairly good sign
// some packets have been lost in transit.
const potentiallyLost = remoteOut[ssrc].packetsSent - stat.packetsReceived;
const prevPotentiallyLost = this.stats.lastRemoteOut[ssrc].packetsSent - this.stats.lastLocalIn[ssrc].packetsReceived;
const lostDiff = prevPotentiallyLost >= 0 && potentiallyLost > prevPotentiallyLost ? potentiallyLost - prevPotentiallyLost : 0;
totalTime += tsDiff;
totalPacketsReceived += receivedDiff;
totalPacketsLost += lostDiff;
Expand Down Expand Up @@ -88,7 +105,7 @@ export class RTCMonitor extends EventEmitter {
totalTime += tsDiff;
totalRemoteJitter += stat.jitter;
totalRTT += stat.roundTripTime;
totalLossRate = stat.fractionLost;
totalLossRate += stat.fractionLost;
totalRemoteStats++;
}
if (totalRemoteStats > 0) {
Expand All @@ -103,6 +120,7 @@ export class RTCMonitor extends EventEmitter {
const localIn = {};
const localOut = {};
const remoteIn = {};
const remoteOut = {};
let candidate;
reports.forEach((report) => {
// Collect necessary stats to make further calculations:
Expand All @@ -123,6 +141,9 @@ export class RTCMonitor extends EventEmitter {
if (report.type === 'remote-inbound-rtp' && report.kind === 'audio') {
remoteIn[report.ssrc] = newRTCRemoteInboundStats(report);
}
if (report.type === 'remote-outbound-rtp' && report.kind === 'audio') {
remoteOut[report.ssrc] = newRTCRemoteOutboundStats(report);
}
});
if (!candidate) {
this.logger.logDebug('RTCMonitor: no valid candidate was found');
Expand All @@ -135,14 +156,15 @@ export class RTCMonitor extends EventEmitter {
transportLatency = (candidate.currentRoundTripTime * 1000) / 2;
}
// Step 2: if receiving any stream, calculate average jitter and loss rate using local stats.
const localInStats = this.getLocalInQualityStats(localIn);
const localInStats = this.getLocalInQualityStats(localIn, remoteOut);
// Step 3: if sending any stream, calculate average latency, jitter and
// loss rate using remote stats.
const remoteInStats = this.getRemoteInQualityStats(remoteIn, localOut);
// Step 4: cache current stats for calculating deltas on next iteration.
this.stats.lastLocalIn = Object.assign({}, localIn);
this.stats.lastLocalOut = Object.assign({}, localOut);
this.stats.lastRemoteIn = Object.assign({}, remoteIn);
this.stats.lastRemoteOut = Object.assign({}, remoteOut);
if (typeof transportLatency === 'undefined' && typeof remoteInStats.avgLatency === 'undefined') {
transportLatency = this.peer.getRTT() / 2;
}
Expand All @@ -160,6 +182,7 @@ export class RTCMonitor extends EventEmitter {
// Step 5 (or the magic step): calculate MOS (Mean Opinion Score)
const mos = this.calculateMOS(latency, jitter, lossRate);
this.emit('mos', mos);
this.peer.handleMetrics(lossRate, jitter / 1000);
this.logger.logDebug(`RTCMonitor: MOS --> ${mos}`);
}
calculateMOS(latency, jitter, lossRate) {
Expand Down Expand Up @@ -197,6 +220,7 @@ export class RTCMonitor extends EventEmitter {
lastLocalIn: {},
lastLocalOut: {},
lastRemoteIn: {},
lastRemoteOut: {},
};
}
}
3 changes: 3 additions & 0 deletions lib/rtc_peer.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export declare class RTCPeer extends EventEmitter {
private dc;
private readonly senders;
private readonly logger;
private enc;
private dec;
private pingIntervalID;
private connTimeoutID;
private rtt;
Expand All @@ -30,6 +32,7 @@ export declare class RTCPeer extends EventEmitter {
replaceTrack(oldTrackID: string, newTrack: MediaStreamTrack | null): void;
removeTrack(trackID: string): void;
getStats(): Promise<RTCStatsReport>;
handleMetrics(lossRate: number, jitter: number): void;
static getVideoCodec(mimeType: string): Promise<RTCRtpCodecCapability | null>;
destroy(): void;
}
64 changes: 51 additions & 13 deletions lib/rtc_peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge
});
};
import { EventEmitter } from 'events';
import { Encoder, Decoder } from '@msgpack/msgpack';
import { DCMessageType } from './types';
import { encodeDCMsg, decodeDCMsg } from './dc_msg';
import { isFirefox, getFirefoxVersion } from './utils';
const rtcConnFailedErr = new Error('rtc connection failed');
const rtcConnTimeoutMsDefault = 15 * 1000;
Expand Down Expand Up @@ -43,6 +46,8 @@ export class RTCPeer extends EventEmitter {
this.pc.oniceconnectionstatechange = () => this.onICEConnectionStateChange();
this.pc.onconnectionstatechange = () => this.onConnectionStateChange();
this.pc.ontrack = (ev) => this.onTrack(ev);
this.enc = new Encoder();
this.dec = new Decoder();
this.connected = false;
const connTimeout = config.connTimeoutMs || rtcConnTimeoutMsDefault;
this.connTimeoutID = setTimeout(() => {
Expand All @@ -55,19 +60,32 @@ export class RTCPeer extends EventEmitter {
// - Calculate transport latency through simple ping/pong sequences.
// - Use this communication channel for further negotiation (to be implemented).
this.dc = this.pc.createDataChannel('calls-dc');
this.dc.binaryType = 'arraybuffer';
this.dc.onmessage = (ev) => this.dcHandler(ev);
this.pingIntervalID = this.initPingHandler();
this.logger.logDebug('RTCPeer: created new client', JSON.stringify(config));
}
dcHandler(ev) {
if (ev.data === 'pong' && this.lastPingTS > 0) {
this.rtt = (performance.now() - this.lastPingTS) / 1000;
try {
const { mt, payload } = decodeDCMsg(this.dec, ev.data);
switch (mt) {
case DCMessageType.Pong:
if (this.lastPingTS > 0) {
this.rtt = (performance.now() - this.lastPingTS) / 1000;
}
break;
case DCMessageType.SDP:
this.logger.logDebug('RTCPeer.dcHandler: received sdp dc message');
this.signal(payload).catch((err) => {
this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err);
});
break;
default:
this.logger.logWarn(`RTCPeer.dcHandler: unexpected dc message type ${mt}`);
}
}
else if (ev.data !== 'pong') {
this.logger.logDebug('RTCPeer.dcHandler: received sdp through DC');
this.signal(ev.data).catch((err) => {
this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err);
});
catch (err) {
this.logger.logErr('failed to decode dc message', err);
}
}
initPingHandler() {
Expand All @@ -76,7 +94,7 @@ export class RTCPeer extends EventEmitter {
return;
}
this.lastPingTS = performance.now();
this.dc.send('ping');
this.dc.send(encodeDCMsg(this.enc, DCMessageType.Ping));
}, pingIntervalMs);
}
getRTT() {
Expand All @@ -87,6 +105,7 @@ export class RTCPeer extends EventEmitter {
}
onICECandidate(ev) {
if (ev.candidate) {
this.logger.logDebug('RTCPeer.onICECandidate: local candidate', JSON.stringify(ev.candidate));
this.emit('candidate', ev.candidate);
}
}
Expand Down Expand Up @@ -119,18 +138,19 @@ export class RTCPeer extends EventEmitter {
try {
this.makingOffer = true;
yield ((_a = this.pc) === null || _a === void 0 ? void 0 : _a.setLocalDescription());
this.logger.logDebug('RTCPeer.onNegotiationNeeded: generated local offer', JSON.stringify((_b = this.pc) === null || _b === void 0 ? void 0 : _b.localDescription));
if (this.config.dcSignaling && this.dc.readyState === 'open') {
this.logger.logDebug('connected, sending offer through data channel', (_b = this.pc) === null || _b === void 0 ? void 0 : _b.localDescription);
this.logger.logDebug('connected, sending offer through data channel');
try {
this.dc.send(JSON.stringify((_c = this.pc) === null || _c === void 0 ? void 0 : _c.localDescription));
this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, (_c = this.pc) === null || _c === void 0 ? void 0 : _c.localDescription));
}
catch (err) {
this.logger.logErr('failed to send on datachannel', err);
}
}
else {
if (this.config.dcSignaling) {
this.logger.logDebug('dc not connected, emitting offer', this.dc.readyState);
this.logger.logDebug('dc not connected, emitting offer');
}
this.emit('offer', (_d = this.pc) === null || _d === void 0 ? void 0 : _d.localDescription);
}
Expand Down Expand Up @@ -178,6 +198,7 @@ export class RTCPeer extends EventEmitter {
if (!this.pc) {
throw new Error('peer has been destroyed');
}
this.logger.logDebug('RTCPeer.signal: handling remote signaling data', data);
const msg = JSON.parse(data);
if (msg.type === 'offer' && (this.makingOffer || ((_a = this.pc) === null || _a === void 0 ? void 0 : _a.signalingState) !== 'stable')) {
this.logger.logDebug('RTCPeer.signal: signaling conflict, we are polite, proceeding...');
Expand All @@ -203,18 +224,19 @@ export class RTCPeer extends EventEmitter {
this.flushICECandidates();
}
yield this.pc.setLocalDescription();
this.logger.logDebug('RTCPeer.signal: generated local answer', JSON.stringify(this.pc.localDescription));
if (this.config.dcSignaling && this.dc.readyState === 'open') {
this.logger.logDebug('connected, sending answer through data channel', this.pc.localDescription);
try {
this.dc.send(JSON.stringify(this.pc.localDescription));
this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, this.pc.localDescription));
}
catch (err) {
this.logger.logErr('failed to send on datachannel', err);
}
}
else {
if (this.config.dcSignaling) {
this.logger.logDebug('dc not connected yet, emitting answer', this.dc.readyState);
this.logger.logDebug('dc not connected, emitting answer');
}
this.emit('answer', this.pc.localDescription);
}
Expand Down Expand Up @@ -319,6 +341,22 @@ export class RTCPeer extends EventEmitter {
}
return this.pc.getStats(null);
}
handleMetrics(lossRate, jitter) {
try {
if (lossRate >= 0) {
this.dc.send(encodeDCMsg(this.enc, DCMessageType.LossRate, lossRate));
}
if (this.rtt > 0) {
this.dc.send(encodeDCMsg(this.enc, DCMessageType.RoundTripTime, this.rtt));
}
if (jitter > 0) {
this.dc.send(encodeDCMsg(this.enc, DCMessageType.Jitter, jitter));
}
}
catch (err) {
this.logger.logErr('failed to send metrics through dc', err);
}
}
static getVideoCodec(mimeType) {
return __awaiter(this, void 0, void 0, function* () {
if (RTCRtpReceiver.getCapabilities) {
Expand Down
6 changes: 6 additions & 0 deletions lib/rtc_stats.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ export declare function newRTCRemoteInboundStats(report: any): {
jitter: any;
roundTripTime: any;
};
export declare function newRTCRemoteOutboundStats(report: any): {
timestamp: any;
kind: any;
packetsSent: any;
bytesSent: any;
};
export declare function newRTCCandidatePairStats(report: any, reports: RTCStatsReport): RTCCandidatePairStats;
export declare function parseSSRCStats(reports: RTCStatsReport): SSRCStats;
export declare function parseICEStats(reports: RTCStatsReport): ICEStats;
Expand Down
15 changes: 9 additions & 6 deletions lib/rtc_stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ export function newRTCRemoteInboundStats(report) {
roundTripTime: report.roundTripTime,
};
}
export function newRTCRemoteOutboundStats(report) {
return {
timestamp: report.timestamp,
kind: report.kind,
packetsSent: report.packetsSent,
bytesSent: report.bytesSent,
};
}
export function newRTCCandidatePairStats(report, reports) {
let local;
let remote;
Expand Down Expand Up @@ -88,12 +96,7 @@ export function parseSSRCStats(reports) {
stats[report.ssrc].remote.in = newRTCRemoteInboundStats(report);
break;
case 'remote-outbound-rtp':
stats[report.ssrc].remote.out = {
timestamp: report.timestamp,
kind: report.kind,
packetsSent: report.packetsSent,
bytesSent: report.bytesSent,
};
stats[report.ssrc].remote.out = newRTCRemoteOutboundStats(report);
break;
}
});
Expand Down
4 changes: 4 additions & 0 deletions lib/setup_jest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { TextEncoder, TextDecoder } from 'util';
global.TextEncoder = TextEncoder;
// @ts-ignore
global.TextDecoder = TextDecoder;
12 changes: 12 additions & 0 deletions lib/types/dc_msg.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export declare enum DCMessageType {
Ping = 1,
Pong = 2,
SDP = 3,
LossRate = 4,
RoundTripTime = 5,
Jitter = 6
}
export type DCMessageSDP = Uint8Array;
export type DCMessageLossRate = number;
export type DCMessageRoundTripTime = number;
export type DCMessageJitter = number;
Loading

0 comments on commit 2ad11b3

Please sign in to comment.