From 2ad11b3c3ce3dfafa8170116fe6fe078bad82e7a Mon Sep 17 00:00:00 2001 From: Claudio Costa Date: Wed, 9 Oct 2024 15:31:31 -0600 Subject: [PATCH] DC messages (#40) * Implement data channel message type * Better logging * Update * Ignore jest setup files * [MM-60561] RTC client metrics (#41) * Client metrics * Improve loss detection in case of senders of loss --- .github/workflows/ci.yml | 2 +- lib/dc_msg.d.ts | 7 ++++ lib/dc_msg.js | 41 ++++++++++++++++++++++ lib/rtc_monitor.js | 34 ++++++++++++++++--- lib/rtc_peer.d.ts | 3 ++ lib/rtc_peer.js | 64 ++++++++++++++++++++++++++++------- lib/rtc_stats.d.ts | 6 ++++ lib/rtc_stats.js | 15 +++++---- lib/setup_jest.js | 4 +++ lib/types/dc_msg.d.ts | 12 +++++++ lib/types/dc_msg.js | 9 +++++ lib/types/index.d.ts | 1 + lib/types/index.js | 1 + package-lock.json | 27 +++++++++++++++ package.json | 7 +++- src/dc_msg.test.ts | 39 +++++++++++++++++++++ src/dc_msg.ts | 49 +++++++++++++++++++++++++++ src/rtc_monitor.ts | 45 +++++++++++++++++++++---- src/rtc_peer.ts | 73 ++++++++++++++++++++++++++++++++-------- src/rtc_stats.ts | 16 +++++---- src/setup_jest.ts | 6 ++++ src/types/dc_msg.ts | 13 +++++++ src/types/index.ts | 1 + tsconfig.json | 2 +- 24 files changed, 424 insertions(+), 53 deletions(-) create mode 100644 lib/dc_msg.d.ts create mode 100644 lib/dc_msg.js create mode 100644 lib/setup_jest.js create mode 100644 lib/types/dc_msg.d.ts create mode 100644 lib/types/dc_msg.js create mode 100644 src/dc_msg.test.ts create mode 100644 src/dc_msg.ts create mode 100644 src/setup_jest.ts create mode 100644 src/types/dc_msg.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2950d2e..98c6817 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,7 +68,7 @@ jobs: run: | mkdir -p tmp npx tsc --outDir tmp - diff -r lib tmp + diff -x 'setup_jest*' -r lib tmp rm -rf tmp test: runs-on: ubuntu-22.04 diff --git a/lib/dc_msg.d.ts b/lib/dc_msg.d.ts new file mode 100644 index 0000000..b471a02 --- /dev/null +++ b/lib/dc_msg.d.ts @@ -0,0 +1,7 @@ +import { Encoder, Decoder } from '@msgpack/msgpack'; +import { DCMessageType } from './types'; +export declare function encodeDCMsg(enc: Encoder, msgType: DCMessageType, payload?: any): Uint8Array; +export declare function decodeDCMsg(dec: Decoder, data: Uint8Array): { + mt: unknown; + payload: unknown; +}; diff --git a/lib/dc_msg.js b/lib/dc_msg.js new file mode 100644 index 0000000..0dd06f4 --- /dev/null +++ b/lib/dc_msg.js @@ -0,0 +1,41 @@ +import { zlibSync, unzlibSync, strToU8, strFromU8 } from 'fflate'; +import { DCMessageType } from './types'; +export function encodeDCMsg(enc, msgType, payload) { + const mt = enc.encode(msgType); + if (typeof payload === 'undefined') { + return mt; + } + let pl; + if (msgType === DCMessageType.SDP) { + pl = enc.encode(zlibSync(strToU8(JSON.stringify(payload)))); + } + else { + pl = enc.encode(payload); + } + // Flat encoding + const msg = new Uint8Array(mt.byteLength + pl.byteLength); + msg.set(mt); + msg.set(pl, mt.byteLength); + return msg; +} +export function decodeDCMsg(dec, data) { + let mt; + let payload; + let i = 0; + // Messages are expected to be flat (no surrounding object). + // We also support payload-less messages (e.g. ping/pong). + for (const val of dec.decodeMulti(data)) { + if (i === 0) { + mt = val; + } + else if (i === 1) { + payload = val; + break; + } + i++; + } + if (mt === DCMessageType.SDP) { + payload = strFromU8(unzlibSync(payload)); + } + return { mt, payload }; +} diff --git a/lib/rtc_monitor.js b/lib/rtc_monitor.js index 7c8de24..58b52ec 100644 --- a/lib/rtc_monitor.js +++ b/lib/rtc_monitor.js @@ -8,7 +8,7 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge }); }; import { EventEmitter } from 'events'; -import { newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCCandidatePairStats } from './rtc_stats'; +import { newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCRemoteOutboundStats, newRTCCandidatePairStats } from './rtc_stats'; export const mosThreshold = 3.5; export class RTCMonitor extends EventEmitter { constructor(cfg) { @@ -28,6 +28,7 @@ export class RTCMonitor extends EventEmitter { lastLocalIn: {}, lastLocalOut: {}, lastRemoteIn: {}, + lastRemoteOut: {}, }; } start() { @@ -37,7 +38,7 @@ export class RTCMonitor extends EventEmitter { this.logger.logDebug('RTCMonitor: starting'); this.intervalID = setInterval(this.gatherStats, this.cfg.monitorInterval); } - getLocalInQualityStats(localIn) { + getLocalInQualityStats(localIn, remoteOut) { const stats = {}; let totalTime = 0; let totalPacketsReceived = 0; @@ -53,7 +54,23 @@ export class RTCMonitor extends EventEmitter { } const tsDiff = stat.timestamp - this.stats.lastLocalIn[ssrc].timestamp; const receivedDiff = stat.packetsReceived - this.stats.lastLocalIn[ssrc].packetsReceived; - const lostDiff = stat.packetsLost - this.stats.lastLocalIn[ssrc].packetsLost; + // Tracking loss on the receiving end is a bit more tricky because packets are + // forwarded without much modification by the server so if the sender is having issues, these are + // propagated to the receiver side which may believe it's having problems as a consequence. + // + // What we want to know instead is whether the local side is having issues on the + // server -> receiver path rather than sender -> server -> receiver one. + // To do this we check for any mismatches in packets sent by the remote and packets + // received by us. + // + // Note: it's expected for local.packetsReceived to be slightly higher than remote.packetsSent + // since reports are generated at different times, with the local one likely being more time-accurate. + // + // Having remote.packetsSent higher than local.packetsReceived is instead a fairly good sign + // some packets have been lost in transit. + const potentiallyLost = remoteOut[ssrc].packetsSent - stat.packetsReceived; + const prevPotentiallyLost = this.stats.lastRemoteOut[ssrc].packetsSent - this.stats.lastLocalIn[ssrc].packetsReceived; + const lostDiff = prevPotentiallyLost >= 0 && potentiallyLost > prevPotentiallyLost ? potentiallyLost - prevPotentiallyLost : 0; totalTime += tsDiff; totalPacketsReceived += receivedDiff; totalPacketsLost += lostDiff; @@ -88,7 +105,7 @@ export class RTCMonitor extends EventEmitter { totalTime += tsDiff; totalRemoteJitter += stat.jitter; totalRTT += stat.roundTripTime; - totalLossRate = stat.fractionLost; + totalLossRate += stat.fractionLost; totalRemoteStats++; } if (totalRemoteStats > 0) { @@ -103,6 +120,7 @@ export class RTCMonitor extends EventEmitter { const localIn = {}; const localOut = {}; const remoteIn = {}; + const remoteOut = {}; let candidate; reports.forEach((report) => { // Collect necessary stats to make further calculations: @@ -123,6 +141,9 @@ export class RTCMonitor extends EventEmitter { if (report.type === 'remote-inbound-rtp' && report.kind === 'audio') { remoteIn[report.ssrc] = newRTCRemoteInboundStats(report); } + if (report.type === 'remote-outbound-rtp' && report.kind === 'audio') { + remoteOut[report.ssrc] = newRTCRemoteOutboundStats(report); + } }); if (!candidate) { this.logger.logDebug('RTCMonitor: no valid candidate was found'); @@ -135,7 +156,7 @@ export class RTCMonitor extends EventEmitter { transportLatency = (candidate.currentRoundTripTime * 1000) / 2; } // Step 2: if receiving any stream, calculate average jitter and loss rate using local stats. - const localInStats = this.getLocalInQualityStats(localIn); + const localInStats = this.getLocalInQualityStats(localIn, remoteOut); // Step 3: if sending any stream, calculate average latency, jitter and // loss rate using remote stats. const remoteInStats = this.getRemoteInQualityStats(remoteIn, localOut); @@ -143,6 +164,7 @@ export class RTCMonitor extends EventEmitter { this.stats.lastLocalIn = Object.assign({}, localIn); this.stats.lastLocalOut = Object.assign({}, localOut); this.stats.lastRemoteIn = Object.assign({}, remoteIn); + this.stats.lastRemoteOut = Object.assign({}, remoteOut); if (typeof transportLatency === 'undefined' && typeof remoteInStats.avgLatency === 'undefined') { transportLatency = this.peer.getRTT() / 2; } @@ -160,6 +182,7 @@ export class RTCMonitor extends EventEmitter { // Step 5 (or the magic step): calculate MOS (Mean Opinion Score) const mos = this.calculateMOS(latency, jitter, lossRate); this.emit('mos', mos); + this.peer.handleMetrics(lossRate, jitter / 1000); this.logger.logDebug(`RTCMonitor: MOS --> ${mos}`); } calculateMOS(latency, jitter, lossRate) { @@ -197,6 +220,7 @@ export class RTCMonitor extends EventEmitter { lastLocalIn: {}, lastLocalOut: {}, lastRemoteIn: {}, + lastRemoteOut: {}, }; } } diff --git a/lib/rtc_peer.d.ts b/lib/rtc_peer.d.ts index f0c4c9b..f755cd5 100644 --- a/lib/rtc_peer.d.ts +++ b/lib/rtc_peer.d.ts @@ -7,6 +7,8 @@ export declare class RTCPeer extends EventEmitter { private dc; private readonly senders; private readonly logger; + private enc; + private dec; private pingIntervalID; private connTimeoutID; private rtt; @@ -30,6 +32,7 @@ export declare class RTCPeer extends EventEmitter { replaceTrack(oldTrackID: string, newTrack: MediaStreamTrack | null): void; removeTrack(trackID: string): void; getStats(): Promise; + handleMetrics(lossRate: number, jitter: number): void; static getVideoCodec(mimeType: string): Promise; destroy(): void; } diff --git a/lib/rtc_peer.js b/lib/rtc_peer.js index 875178e..fdbae24 100644 --- a/lib/rtc_peer.js +++ b/lib/rtc_peer.js @@ -8,6 +8,9 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, ge }); }; import { EventEmitter } from 'events'; +import { Encoder, Decoder } from '@msgpack/msgpack'; +import { DCMessageType } from './types'; +import { encodeDCMsg, decodeDCMsg } from './dc_msg'; import { isFirefox, getFirefoxVersion } from './utils'; const rtcConnFailedErr = new Error('rtc connection failed'); const rtcConnTimeoutMsDefault = 15 * 1000; @@ -43,6 +46,8 @@ export class RTCPeer extends EventEmitter { this.pc.oniceconnectionstatechange = () => this.onICEConnectionStateChange(); this.pc.onconnectionstatechange = () => this.onConnectionStateChange(); this.pc.ontrack = (ev) => this.onTrack(ev); + this.enc = new Encoder(); + this.dec = new Decoder(); this.connected = false; const connTimeout = config.connTimeoutMs || rtcConnTimeoutMsDefault; this.connTimeoutID = setTimeout(() => { @@ -55,19 +60,32 @@ export class RTCPeer extends EventEmitter { // - Calculate transport latency through simple ping/pong sequences. // - Use this communication channel for further negotiation (to be implemented). this.dc = this.pc.createDataChannel('calls-dc'); + this.dc.binaryType = 'arraybuffer'; this.dc.onmessage = (ev) => this.dcHandler(ev); this.pingIntervalID = this.initPingHandler(); this.logger.logDebug('RTCPeer: created new client', JSON.stringify(config)); } dcHandler(ev) { - if (ev.data === 'pong' && this.lastPingTS > 0) { - this.rtt = (performance.now() - this.lastPingTS) / 1000; + try { + const { mt, payload } = decodeDCMsg(this.dec, ev.data); + switch (mt) { + case DCMessageType.Pong: + if (this.lastPingTS > 0) { + this.rtt = (performance.now() - this.lastPingTS) / 1000; + } + break; + case DCMessageType.SDP: + this.logger.logDebug('RTCPeer.dcHandler: received sdp dc message'); + this.signal(payload).catch((err) => { + this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err); + }); + break; + default: + this.logger.logWarn(`RTCPeer.dcHandler: unexpected dc message type ${mt}`); + } } - else if (ev.data !== 'pong') { - this.logger.logDebug('RTCPeer.dcHandler: received sdp through DC'); - this.signal(ev.data).catch((err) => { - this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err); - }); + catch (err) { + this.logger.logErr('failed to decode dc message', err); } } initPingHandler() { @@ -76,7 +94,7 @@ export class RTCPeer extends EventEmitter { return; } this.lastPingTS = performance.now(); - this.dc.send('ping'); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.Ping)); }, pingIntervalMs); } getRTT() { @@ -87,6 +105,7 @@ export class RTCPeer extends EventEmitter { } onICECandidate(ev) { if (ev.candidate) { + this.logger.logDebug('RTCPeer.onICECandidate: local candidate', JSON.stringify(ev.candidate)); this.emit('candidate', ev.candidate); } } @@ -119,10 +138,11 @@ export class RTCPeer extends EventEmitter { try { this.makingOffer = true; yield ((_a = this.pc) === null || _a === void 0 ? void 0 : _a.setLocalDescription()); + this.logger.logDebug('RTCPeer.onNegotiationNeeded: generated local offer', JSON.stringify((_b = this.pc) === null || _b === void 0 ? void 0 : _b.localDescription)); if (this.config.dcSignaling && this.dc.readyState === 'open') { - this.logger.logDebug('connected, sending offer through data channel', (_b = this.pc) === null || _b === void 0 ? void 0 : _b.localDescription); + this.logger.logDebug('connected, sending offer through data channel'); try { - this.dc.send(JSON.stringify((_c = this.pc) === null || _c === void 0 ? void 0 : _c.localDescription)); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, (_c = this.pc) === null || _c === void 0 ? void 0 : _c.localDescription)); } catch (err) { this.logger.logErr('failed to send on datachannel', err); @@ -130,7 +150,7 @@ export class RTCPeer extends EventEmitter { } else { if (this.config.dcSignaling) { - this.logger.logDebug('dc not connected, emitting offer', this.dc.readyState); + this.logger.logDebug('dc not connected, emitting offer'); } this.emit('offer', (_d = this.pc) === null || _d === void 0 ? void 0 : _d.localDescription); } @@ -178,6 +198,7 @@ export class RTCPeer extends EventEmitter { if (!this.pc) { throw new Error('peer has been destroyed'); } + this.logger.logDebug('RTCPeer.signal: handling remote signaling data', data); const msg = JSON.parse(data); if (msg.type === 'offer' && (this.makingOffer || ((_a = this.pc) === null || _a === void 0 ? void 0 : _a.signalingState) !== 'stable')) { this.logger.logDebug('RTCPeer.signal: signaling conflict, we are polite, proceeding...'); @@ -203,10 +224,11 @@ export class RTCPeer extends EventEmitter { this.flushICECandidates(); } yield this.pc.setLocalDescription(); + this.logger.logDebug('RTCPeer.signal: generated local answer', JSON.stringify(this.pc.localDescription)); if (this.config.dcSignaling && this.dc.readyState === 'open') { this.logger.logDebug('connected, sending answer through data channel', this.pc.localDescription); try { - this.dc.send(JSON.stringify(this.pc.localDescription)); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, this.pc.localDescription)); } catch (err) { this.logger.logErr('failed to send on datachannel', err); @@ -214,7 +236,7 @@ export class RTCPeer extends EventEmitter { } else { if (this.config.dcSignaling) { - this.logger.logDebug('dc not connected yet, emitting answer', this.dc.readyState); + this.logger.logDebug('dc not connected, emitting answer'); } this.emit('answer', this.pc.localDescription); } @@ -319,6 +341,22 @@ export class RTCPeer extends EventEmitter { } return this.pc.getStats(null); } + handleMetrics(lossRate, jitter) { + try { + if (lossRate >= 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.LossRate, lossRate)); + } + if (this.rtt > 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.RoundTripTime, this.rtt)); + } + if (jitter > 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.Jitter, jitter)); + } + } + catch (err) { + this.logger.logErr('failed to send metrics through dc', err); + } + } static getVideoCodec(mimeType) { return __awaiter(this, void 0, void 0, function* () { if (RTCRtpReceiver.getCapabilities) { diff --git a/lib/rtc_stats.d.ts b/lib/rtc_stats.d.ts index 3671c39..a928ef8 100644 --- a/lib/rtc_stats.d.ts +++ b/lib/rtc_stats.d.ts @@ -33,6 +33,12 @@ export declare function newRTCRemoteInboundStats(report: any): { jitter: any; roundTripTime: any; }; +export declare function newRTCRemoteOutboundStats(report: any): { + timestamp: any; + kind: any; + packetsSent: any; + bytesSent: any; +}; export declare function newRTCCandidatePairStats(report: any, reports: RTCStatsReport): RTCCandidatePairStats; export declare function parseSSRCStats(reports: RTCStatsReport): SSRCStats; export declare function parseICEStats(reports: RTCStatsReport): ICEStats; diff --git a/lib/rtc_stats.js b/lib/rtc_stats.js index f38779d..947054f 100644 --- a/lib/rtc_stats.js +++ b/lib/rtc_stats.js @@ -40,6 +40,14 @@ export function newRTCRemoteInboundStats(report) { roundTripTime: report.roundTripTime, }; } +export function newRTCRemoteOutboundStats(report) { + return { + timestamp: report.timestamp, + kind: report.kind, + packetsSent: report.packetsSent, + bytesSent: report.bytesSent, + }; +} export function newRTCCandidatePairStats(report, reports) { let local; let remote; @@ -88,12 +96,7 @@ export function parseSSRCStats(reports) { stats[report.ssrc].remote.in = newRTCRemoteInboundStats(report); break; case 'remote-outbound-rtp': - stats[report.ssrc].remote.out = { - timestamp: report.timestamp, - kind: report.kind, - packetsSent: report.packetsSent, - bytesSent: report.bytesSent, - }; + stats[report.ssrc].remote.out = newRTCRemoteOutboundStats(report); break; } }); diff --git a/lib/setup_jest.js b/lib/setup_jest.js new file mode 100644 index 0000000..2654117 --- /dev/null +++ b/lib/setup_jest.js @@ -0,0 +1,4 @@ +import { TextEncoder, TextDecoder } from 'util'; +global.TextEncoder = TextEncoder; +// @ts-ignore +global.TextDecoder = TextDecoder; diff --git a/lib/types/dc_msg.d.ts b/lib/types/dc_msg.d.ts new file mode 100644 index 0000000..2f7cee9 --- /dev/null +++ b/lib/types/dc_msg.d.ts @@ -0,0 +1,12 @@ +export declare enum DCMessageType { + Ping = 1, + Pong = 2, + SDP = 3, + LossRate = 4, + RoundTripTime = 5, + Jitter = 6 +} +export type DCMessageSDP = Uint8Array; +export type DCMessageLossRate = number; +export type DCMessageRoundTripTime = number; +export type DCMessageJitter = number; diff --git a/lib/types/dc_msg.js b/lib/types/dc_msg.js new file mode 100644 index 0000000..31d629e --- /dev/null +++ b/lib/types/dc_msg.js @@ -0,0 +1,9 @@ +export var DCMessageType; +(function (DCMessageType) { + DCMessageType[DCMessageType["Ping"] = 1] = "Ping"; + DCMessageType[DCMessageType["Pong"] = 2] = "Pong"; + DCMessageType[DCMessageType["SDP"] = 3] = "SDP"; + DCMessageType[DCMessageType["LossRate"] = 4] = "LossRate"; + DCMessageType[DCMessageType["RoundTripTime"] = 5] = "RoundTripTime"; + DCMessageType[DCMessageType["Jitter"] = 6] = "Jitter"; +})(DCMessageType || (DCMessageType = {})); diff --git a/lib/types/index.d.ts b/lib/types/index.d.ts index 8a0d3d0..9b4360c 100644 --- a/lib/types/index.d.ts +++ b/lib/types/index.d.ts @@ -1,2 +1,3 @@ export * from './types'; export * from './webrtc'; +export * from './dc_msg'; diff --git a/lib/types/index.js b/lib/types/index.js index 8a0d3d0..9b4360c 100644 --- a/lib/types/index.js +++ b/lib/types/index.js @@ -1,2 +1,3 @@ export * from './types'; export * from './webrtc'; +export * from './dc_msg'; diff --git a/package-lock.json b/package-lock.json index 03153c0..479901a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,6 +7,10 @@ "": { "name": "@mattermost/calls-common", "version": "0.27.2", + "dependencies": { + "@msgpack/msgpack": "^3.0.0-beta2", + "fflate": "^0.8.2" + }, "devDependencies": { "@babel/eslint-parser": "7.19.1", "@babel/preset-env": "7.16.4", @@ -2699,6 +2703,14 @@ "@jridgewell/sourcemap-codec": "1.4.14" } }, + "node_modules/@msgpack/msgpack": { + "version": "3.0.0-beta2", + "resolved": "https://registry.npmjs.org/@msgpack/msgpack/-/msgpack-3.0.0-beta2.tgz", + "integrity": "sha512-y+l1PNV0XDyY8sM3YtuMLK5vE3/hkfId+Do8pLo/OPxfxuFAUwcGz3oiiUuV46/aBpwTzZ+mRWVMtlSKbradhw==", + "engines": { + "node": ">= 14" + } + }, "node_modules/@nicolo-ribaudo/eslint-scope-5-internals": { "version": "5.1.1-v1", "resolved": "https://registry.npmjs.org/@nicolo-ribaudo/eslint-scope-5-internals/-/eslint-scope-5-internals-5.1.1-v1.tgz", @@ -5163,6 +5175,11 @@ "bser": "2.1.1" } }, + "node_modules/fflate": { + "version": "0.8.2", + "resolved": "https://registry.npmjs.org/fflate/-/fflate-0.8.2.tgz", + "integrity": "sha512-cPJU47OaAoCbg0pBvzsgpTPhmhqI5eJjh/JIu8tPj5q+T7iLvW/JAYUqmE7KOB4R1ZyEhzBaIQpQpardBF5z8A==" + }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -11907,6 +11924,11 @@ "@jridgewell/sourcemap-codec": "1.4.14" } }, + "@msgpack/msgpack": { + "version": "3.0.0-beta2", + "resolved": "https://registry.npmjs.org/@msgpack/msgpack/-/msgpack-3.0.0-beta2.tgz", + "integrity": "sha512-y+l1PNV0XDyY8sM3YtuMLK5vE3/hkfId+Do8pLo/OPxfxuFAUwcGz3oiiUuV46/aBpwTzZ+mRWVMtlSKbradhw==" + }, "@nicolo-ribaudo/eslint-scope-5-internals": { "version": "5.1.1-v1", "resolved": "https://registry.npmjs.org/@nicolo-ribaudo/eslint-scope-5-internals/-/eslint-scope-5-internals-5.1.1-v1.tgz", @@ -13823,6 +13845,11 @@ "bser": "2.1.1" } }, + "fflate": { + "version": "0.8.2", + "resolved": "https://registry.npmjs.org/fflate/-/fflate-0.8.2.tgz", + "integrity": "sha512-cPJU47OaAoCbg0pBvzsgpTPhmhqI5eJjh/JIu8tPj5q+T7iLvW/JAYUqmE7KOB4R1ZyEhzBaIQpQpardBF5z8A==" + }, "file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", diff --git a/package.json b/package.json index b962ac1..a01b785 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,11 @@ "webpack": "5.75.0" }, "jest": { - "testEnvironment": "jsdom" + "testEnvironment": "jsdom", + "setupFilesAfterEnv": ["/src/setup_jest.ts"] + }, + "dependencies": { + "@msgpack/msgpack": "^3.0.0-beta2", + "fflate": "^0.8.2" } } diff --git a/src/dc_msg.test.ts b/src/dc_msg.test.ts new file mode 100644 index 0000000..6a159db --- /dev/null +++ b/src/dc_msg.test.ts @@ -0,0 +1,39 @@ +import {expect} from '@jest/globals'; +import {Encoder, Decoder} from '@msgpack/msgpack'; + +import {DCMessageType} from './types'; +import {encodeDCMsg, decodeDCMsg} from './dc_msg'; + +describe('dcMsg', () => { + const enc = new Encoder(); + const dec = new Decoder(); + + it('ping', () => { + const pingMsg = encodeDCMsg(enc, DCMessageType.Ping); + expect(pingMsg).toEqual(new Uint8Array([DCMessageType.Ping])); + + const {mt, payload} = decodeDCMsg(dec, pingMsg); + expect(mt).toEqual(DCMessageType.Ping); + expect(payload).toBeUndefined(); + }); + + it('pong', () => { + const pongMsg = encodeDCMsg(enc, DCMessageType.Pong); + expect(pongMsg).toEqual(new Uint8Array([DCMessageType.Pong])); + + const {mt, payload} = decodeDCMsg(dec, pongMsg); + expect(mt).toEqual(DCMessageType.Pong); + expect(payload).toBeUndefined(); + }); + + it('sdp', () => { + const sdp = { + type: 'offer', + sdp: 'sdp', + }; + const sdpMsg = encodeDCMsg(enc, DCMessageType.SDP, sdp); + const {mt, payload} = decodeDCMsg(dec, sdpMsg); + expect(mt).toEqual(DCMessageType.SDP); + expect(JSON.parse(payload)).toEqual(sdp); + }); +}); diff --git a/src/dc_msg.ts b/src/dc_msg.ts new file mode 100644 index 0000000..a165acb --- /dev/null +++ b/src/dc_msg.ts @@ -0,0 +1,49 @@ +import {Encoder, Decoder} from '@msgpack/msgpack'; +import {zlibSync, unzlibSync, strToU8, strFromU8} from 'fflate'; + +import {DCMessageType, DCMessageSDP} from './types'; + +export function encodeDCMsg(enc: Encoder, msgType: DCMessageType, payload?: any) { + const mt = enc.encode(msgType); + if (typeof payload === 'undefined') { + return mt; + } + + let pl; + if (msgType === DCMessageType.SDP) { + pl = enc.encode(zlibSync(strToU8(JSON.stringify(payload)))); + } else { + pl = enc.encode(payload); + } + + // Flat encoding + const msg = new Uint8Array(mt.byteLength + pl.byteLength); + msg.set(mt); + msg.set(pl, mt.byteLength); + + return msg; +} + +export function decodeDCMsg(dec: Decoder, data: Uint8Array) { + let mt; + let payload; + let i = 0; + + // Messages are expected to be flat (no surrounding object). + // We also support payload-less messages (e.g. ping/pong). + for (const val of dec.decodeMulti(data)) { + if (i === 0) { + mt = val; + } else if (i === 1) { + payload = val; + break; + } + i++; + } + + if (mt === DCMessageType.SDP) { + payload = strFromU8(unzlibSync(payload as DCMessageSDP)); + } + + return {mt, payload}; +} diff --git a/src/rtc_monitor.ts b/src/rtc_monitor.ts index 1752a6c..69f392a 100644 --- a/src/rtc_monitor.ts +++ b/src/rtc_monitor.ts @@ -1,7 +1,7 @@ import {EventEmitter} from 'events'; -import {Logger, RTCMonitorConfig, RTCLocalInboundStats, RTCRemoteInboundStats, RTCLocalOutboundStats, RTCCandidatePairStats} from './types'; -import {newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCCandidatePairStats} from './rtc_stats'; +import {Logger, RTCMonitorConfig, RTCLocalInboundStats, RTCRemoteInboundStats, RTCRemoteOutboundStats, RTCLocalOutboundStats, RTCCandidatePairStats} from './types'; +import {newRTCLocalInboundStats, newRTCLocalOutboundStats, newRTCRemoteInboundStats, newRTCRemoteOutboundStats, newRTCCandidatePairStats} from './rtc_stats'; import {RTCPeer} from './rtc_peer'; export const mosThreshold = 3.5; @@ -18,10 +18,15 @@ type RemoteInboundStatsMap = { [key: string]: RTCRemoteInboundStats, }; +type RemoteOutboundStatsMap = { + [key: string]: RTCRemoteOutboundStats, +} + type MonitorStatsSample = { lastLocalIn: LocalInboundStatsMap, lastLocalOut: LocalOutboundStatsMap, lastRemoteIn: RemoteInboundStatsMap, + lastRemoteOut: RemoteOutboundStatsMap, }; type CallQualityStats = { @@ -48,6 +53,7 @@ export class RTCMonitor extends EventEmitter { lastLocalIn: {}, lastLocalOut: {}, lastRemoteIn: {}, + lastRemoteOut: {}, }; } @@ -69,7 +75,7 @@ export class RTCMonitor extends EventEmitter { }); }; - private getLocalInQualityStats(localIn: LocalInboundStatsMap) { + private getLocalInQualityStats(localIn: LocalInboundStatsMap, remoteOut: RemoteOutboundStatsMap) { const stats: CallQualityStats = {}; let totalTime = 0; @@ -89,7 +95,24 @@ export class RTCMonitor extends EventEmitter { const tsDiff = stat.timestamp - this.stats.lastLocalIn[ssrc].timestamp; const receivedDiff = stat.packetsReceived - this.stats.lastLocalIn[ssrc].packetsReceived; - const lostDiff = stat.packetsLost - this.stats.lastLocalIn[ssrc].packetsLost; + + // Tracking loss on the receiving end is a bit more tricky because packets are + // forwarded without much modification by the server so if the sender is having issues, these are + // propagated to the receiver side which may believe it's having problems as a consequence. + // + // What we want to know instead is whether the local side is having issues on the + // server -> receiver path rather than sender -> server -> receiver one. + // To do this we check for any mismatches in packets sent by the remote and packets + // received by us. + // + // Note: it's expected for local.packetsReceived to be slightly higher than remote.packetsSent + // since reports are generated at different times, with the local one likely being more time-accurate. + // + // Having remote.packetsSent higher than local.packetsReceived is instead a fairly good sign + // some packets have been lost in transit. + const potentiallyLost = remoteOut[ssrc].packetsSent - stat.packetsReceived; + const prevPotentiallyLost = this.stats.lastRemoteOut[ssrc].packetsSent - this.stats.lastLocalIn[ssrc].packetsReceived; + const lostDiff = prevPotentiallyLost >= 0 && potentiallyLost > prevPotentiallyLost ? potentiallyLost - prevPotentiallyLost : 0; totalTime += tsDiff; totalPacketsReceived += receivedDiff; @@ -131,7 +154,7 @@ export class RTCMonitor extends EventEmitter { totalTime += tsDiff; totalRemoteJitter += stat.jitter; totalRTT += stat.roundTripTime; - totalLossRate = stat.fractionLost; + totalLossRate += stat.fractionLost; totalRemoteStats++; } @@ -149,6 +172,7 @@ export class RTCMonitor extends EventEmitter { const localIn: LocalInboundStatsMap = {}; const localOut: LocalOutboundStatsMap = {}; const remoteIn: RemoteInboundStatsMap = {}; + const remoteOut: RemoteOutboundStatsMap = {}; let candidate: RTCCandidatePairStats | undefined; reports.forEach((report: any) => { // Collect necessary stats to make further calculations: @@ -173,6 +197,10 @@ export class RTCMonitor extends EventEmitter { if (report.type === 'remote-inbound-rtp' && report.kind === 'audio') { remoteIn[report.ssrc] = newRTCRemoteInboundStats(report); } + + if (report.type === 'remote-outbound-rtp' && report.kind === 'audio') { + remoteOut[report.ssrc] = newRTCRemoteOutboundStats(report); + } }); if (!candidate) { @@ -189,7 +217,7 @@ export class RTCMonitor extends EventEmitter { } // Step 2: if receiving any stream, calculate average jitter and loss rate using local stats. - const localInStats = this.getLocalInQualityStats(localIn); + const localInStats = this.getLocalInQualityStats(localIn, remoteOut); // Step 3: if sending any stream, calculate average latency, jitter and // loss rate using remote stats. @@ -205,6 +233,9 @@ export class RTCMonitor extends EventEmitter { this.stats.lastRemoteIn = { ...remoteIn, }; + this.stats.lastRemoteOut = { + ...remoteOut, + }; if (typeof transportLatency === 'undefined' && typeof remoteInStats.avgLatency === 'undefined') { transportLatency = this.peer.getRTT() / 2; @@ -227,6 +258,7 @@ export class RTCMonitor extends EventEmitter { // Step 5 (or the magic step): calculate MOS (Mean Opinion Score) const mos = this.calculateMOS(latency!, jitter, lossRate); this.emit('mos', mos); + this.peer.handleMetrics(lossRate, jitter / 1000); this.logger.logDebug(`RTCMonitor: MOS --> ${mos}`); } @@ -272,6 +304,7 @@ export class RTCMonitor extends EventEmitter { lastLocalIn: {}, lastLocalOut: {}, lastRemoteIn: {}, + lastRemoteOut: {}, }; } } diff --git a/src/rtc_peer.ts b/src/rtc_peer.ts index 1bda63d..d9b4eab 100644 --- a/src/rtc_peer.ts +++ b/src/rtc_peer.ts @@ -1,6 +1,10 @@ import {EventEmitter} from 'events'; -import {Logger, RTCPeerConfig, RTCTrackOptions} from './types'; +import {Encoder, Decoder} from '@msgpack/msgpack'; + +import {Logger, RTCPeerConfig, RTCTrackOptions, DCMessageType} from './types'; + +import {encodeDCMsg, decodeDCMsg} from './dc_msg'; import {isFirefox, getFirefoxVersion} from './utils'; @@ -28,6 +32,8 @@ export class RTCPeer extends EventEmitter { private dc: RTCDataChannel; private readonly senders: { [key: string]: RTCRtpSender[] }; private readonly logger: Logger; + private enc: Encoder; + private dec: Decoder; private pingIntervalID: ReturnType; private connTimeoutID: ReturnType; @@ -55,6 +61,9 @@ export class RTCPeer extends EventEmitter { this.pc.onconnectionstatechange = () => this.onConnectionStateChange(); this.pc.ontrack = (ev) => this.onTrack(ev); + this.enc = new Encoder(); + this.dec = new Decoder(); + this.connected = false; const connTimeout = config.connTimeoutMs || rtcConnTimeoutMsDefault; this.connTimeoutID = setTimeout(() => { @@ -68,6 +77,7 @@ export class RTCPeer extends EventEmitter { // - Calculate transport latency through simple ping/pong sequences. // - Use this communication channel for further negotiation (to be implemented). this.dc = this.pc.createDataChannel('calls-dc'); + this.dc.binaryType = 'arraybuffer'; this.dc.onmessage = (ev) => this.dcHandler(ev); this.pingIntervalID = this.initPingHandler(); @@ -76,13 +86,25 @@ export class RTCPeer extends EventEmitter { } private dcHandler(ev: MessageEvent) { - if (ev.data === 'pong' && this.lastPingTS > 0) { - this.rtt = (performance.now() - this.lastPingTS) / 1000; - } else if (ev.data !== 'pong') { - this.logger.logDebug('RTCPeer.dcHandler: received sdp through DC'); - this.signal(ev.data).catch((err) => { - this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err); - }); + try { + const {mt, payload} = decodeDCMsg(this.dec, ev.data); + switch (mt) { + case DCMessageType.Pong: + if (this.lastPingTS > 0) { + this.rtt = (performance.now() - this.lastPingTS) / 1000; + } + break; + case DCMessageType.SDP: + this.logger.logDebug('RTCPeer.dcHandler: received sdp dc message'); + this.signal(payload as string).catch((err) => { + this.logger.logErr('RTCPeer.dcHandler: failed to signal sdp', err); + }); + break; + default: + this.logger.logWarn(`RTCPeer.dcHandler: unexpected dc message type ${mt}`); + } + } catch (err) { + this.logger.logErr('failed to decode dc message', err); } } @@ -92,7 +114,7 @@ export class RTCPeer extends EventEmitter { return; } this.lastPingTS = performance.now(); - this.dc.send('ping'); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.Ping)); }, pingIntervalMs); } @@ -105,6 +127,7 @@ export class RTCPeer extends EventEmitter { private onICECandidate(ev: RTCPeerConnectionIceEvent) { if (ev.candidate) { + this.logger.logDebug('RTCPeer.onICECandidate: local candidate', JSON.stringify(ev.candidate)); this.emit('candidate', ev.candidate); } } @@ -137,16 +160,18 @@ export class RTCPeer extends EventEmitter { this.makingOffer = true; await this.pc?.setLocalDescription(); + this.logger.logDebug('RTCPeer.onNegotiationNeeded: generated local offer', JSON.stringify(this.pc?.localDescription)); + if (this.config.dcSignaling && this.dc.readyState === 'open') { - this.logger.logDebug('connected, sending offer through data channel', this.pc?.localDescription); + this.logger.logDebug('connected, sending offer through data channel'); try { - this.dc.send(JSON.stringify(this.pc?.localDescription)); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, this.pc?.localDescription)); } catch (err) { this.logger.logErr('failed to send on datachannel', err); } } else { if (this.config.dcSignaling) { - this.logger.logDebug('dc not connected, emitting offer', this.dc.readyState); + this.logger.logDebug('dc not connected, emitting offer'); } this.emit('offer', this.pc?.localDescription); } @@ -193,6 +218,8 @@ export class RTCPeer extends EventEmitter { throw new Error('peer has been destroyed'); } + this.logger.logDebug('RTCPeer.signal: handling remote signaling data', data); + const msg = JSON.parse(data); if (msg.type === 'offer' && (this.makingOffer || this.pc?.signalingState !== 'stable')) { @@ -220,16 +247,18 @@ export class RTCPeer extends EventEmitter { } await this.pc.setLocalDescription(); + this.logger.logDebug('RTCPeer.signal: generated local answer', JSON.stringify(this.pc.localDescription)); + if (this.config.dcSignaling && this.dc.readyState === 'open') { this.logger.logDebug('connected, sending answer through data channel', this.pc.localDescription); try { - this.dc.send(JSON.stringify(this.pc.localDescription)); + this.dc.send(encodeDCMsg(this.enc, DCMessageType.SDP, this.pc.localDescription)); } catch (err) { this.logger.logErr('failed to send on datachannel', err); } } else { if (this.config.dcSignaling) { - this.logger.logDebug('dc not connected yet, emitting answer', this.dc.readyState); + this.logger.logDebug('dc not connected, emitting answer'); } this.emit('answer', this.pc.localDescription); } @@ -349,6 +378,22 @@ export class RTCPeer extends EventEmitter { return this.pc.getStats(null); } + public handleMetrics(lossRate: number, jitter: number) { + try { + if (lossRate >= 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.LossRate, lossRate)); + } + if (this.rtt > 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.RoundTripTime, this.rtt)); + } + if (jitter > 0) { + this.dc.send(encodeDCMsg(this.enc, DCMessageType.Jitter, jitter)); + } + } catch (err) { + this.logger.logErr('failed to send metrics through dc', err); + } + } + static async getVideoCodec(mimeType: string) { if (RTCRtpReceiver.getCapabilities) { const videoCapabilities = await RTCRtpReceiver.getCapabilities('video'); diff --git a/src/rtc_stats.ts b/src/rtc_stats.ts index 355dca3..3ef33c1 100644 --- a/src/rtc_stats.ts +++ b/src/rtc_stats.ts @@ -47,6 +47,15 @@ export function newRTCRemoteInboundStats(report: any) { }; } +export function newRTCRemoteOutboundStats(report: any) { + return { + timestamp: report.timestamp, + kind: report.kind, + packetsSent: report.packetsSent, + bytesSent: report.bytesSent, + }; +} + export function newRTCCandidatePairStats(report: any, reports: RTCStatsReport): RTCCandidatePairStats { let local; let remote; @@ -98,12 +107,7 @@ export function parseSSRCStats(reports: RTCStatsReport): SSRCStats { stats[report.ssrc].remote.in = newRTCRemoteInboundStats(report); break; case 'remote-outbound-rtp': - stats[report.ssrc].remote.out = { - timestamp: report.timestamp, - kind: report.kind, - packetsSent: report.packetsSent, - bytesSent: report.bytesSent, - }; + stats[report.ssrc].remote.out = newRTCRemoteOutboundStats(report); break; } }); diff --git a/src/setup_jest.ts b/src/setup_jest.ts new file mode 100644 index 0000000..dce8078 --- /dev/null +++ b/src/setup_jest.ts @@ -0,0 +1,6 @@ +import {TextEncoder, TextDecoder} from 'util'; + +global.TextEncoder = TextEncoder; + +// @ts-ignore +global.TextDecoder = TextDecoder; diff --git a/src/types/dc_msg.ts b/src/types/dc_msg.ts new file mode 100644 index 0000000..ebd5855 --- /dev/null +++ b/src/types/dc_msg.ts @@ -0,0 +1,13 @@ +export enum DCMessageType { + Ping = 1, + Pong, + SDP, + LossRate, + RoundTripTime, + Jitter, +} + +export type DCMessageSDP = Uint8Array; +export type DCMessageLossRate = number; +export type DCMessageRoundTripTime = number; +export type DCMessageJitter = number; diff --git a/src/types/index.ts b/src/types/index.ts index 8a0d3d0..9b4360c 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -1,2 +1,3 @@ export * from './types'; export * from './webrtc'; +export * from './dc_msg'; diff --git a/tsconfig.json b/tsconfig.json index 1ef3e2a..fddfa35 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -19,5 +19,5 @@ ] }, }, - "exclude": ["**/node_modules", "lib/**", "tmp/**", "**/*.test.js", "**/*.test.ts"] + "exclude": ["**/node_modules", "lib/**", "tmp/**", "**/*.test.js", "**/*.test.ts", "setup_jest*"] }