Skip to content

Commit

Permalink
add ClientIssue
Browse files Browse the repository at this point in the history
  • Loading branch information
balazskreith committed Apr 4, 2024
1 parent cae39ae commit d34cacb
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 13 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@observertc/observer-js",
"version": "0.40.11-beta",
"version": "0.40.12",
"description": "Server Side NodeJS Library for processing ObserveRTC Samples",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
20 changes: 20 additions & 0 deletions src/ObservedClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import { ObservedSfu } from './ObservedSfu';

const logger = createLogger('ObservedClient');

export interface ClientIssue {
severity: 'critical' | 'major' | 'minor';
timestamp: number;
description?: string;
peerConnectionId?: string,
trackId?: string,
}

export type ObservedClientModel= {
clientId: string;
mediaUnitId: string;
Expand Down Expand Up @@ -46,6 +54,7 @@ export type ObservedClientEvents = {
localCandidate: IceLocalCandidate,
remoteCandidate: IceRemoteCandidate,
}],
issue: [ClientIssue],
usingturn: [boolean],
usermediaerror: [string],
};
Expand Down Expand Up @@ -393,6 +402,17 @@ export class ObservedClient<AppData extends Record<string, unknown> = Record<str
this._left = timestamp;
break;
}
case CallEventType.CLIENT_ISSUE: {
const severity = callEvent.value ? callEvent.value as ClientIssue['severity'] : 'minor';

this.emit('issue', {
severity,
timestamp: timestamp ?? Date.now(),
description: callEvent.message,
peerConnectionId: callEvent.peerConnectionId,
trackId: callEvent.mediaTrackId,
});
}
}
this.reports.addCallEventReport({
serviceId: this.serviceId,
Expand Down
9 changes: 9 additions & 0 deletions src/ObservedOutboundTrack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type ObservedOutboundTrackModel<K extends MediaKind> = {
}

export type ObservedOutboundTrackEvents = {
qualitylimitationchanged: [string];
update: [{
elapsedTimeInMs: number;
}],
Expand Down Expand Up @@ -197,6 +198,14 @@ export class ObservedOutboundTrack<Kind extends MediaKind> extends EventEmitter
if (videoSample?.framesSent && lastVideoStats?.framesSent && lastVideoStats.framesSent < videoSample.framesSent) {
deltaSentFrames = videoSample.framesSent - lastVideoStats.framesSent;
}

if (videoSample.qualityLimitationReason && lastVideoStats?.qualityLimitationReason !== videoSample.qualityLimitationReason) {
this.emit('qualitylimitationchanged', videoSample.qualityLimitationReason);
}
} else if (this.kind === 'audio') {
// const audioSample = sample as OutboundAudioTrack;
// const lastAudioStats = lastStat as OutboundAudioTrack | undefined;

}
const stats: ObservedOutboundTrackStats<Kind> = {
...sample,
Expand Down
20 changes: 13 additions & 7 deletions src/Observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { PartialBy } from './common/utils';
import { createCallEndedEventReport, createCallStartedEventReport } from './common/callEventReports';
import { ObserverSinkContext } from './common/types';
import { ObservedSfu, ObservedSfuModel } from './ObservedSfu';
import { CallSummaryMonitor } from './monitors/CallSummaryMonitor';
import { CallSummaryMonitor, CallSummaryMonitorConfig } from './monitors/CallSummaryMonitor';
import { TurnUsageMonitor } from './monitors/TurnUsageMonitor';
import { ObservedClient } from './ObservedClient';
import { ObservedPeerConnection } from './ObservedPeerConnection';
Expand Down Expand Up @@ -104,13 +104,19 @@ export class Observer extends EventEmitter {
}

public createObservedCall<T extends Record<string, unknown> = Record<string, unknown>>(
config: PartialBy<ObservedCallModel, 'serviceId'> & { appData: T, started?: number }
config: PartialBy<ObservedCallModel, 'serviceId'> & { appData: T, started?: number, reportCallStarted?: boolean, reportCallEnded?: boolean }
): ObservedCall<T> {
if (this._closed) {
throw new Error('Attempted to create a call source on a closed observer');
}

const { appData, started = Date.now(), ...model } = config;
const {
appData,
started = Date.now(),
reportCallEnded = true,
reportCallStarted = true,
...model
} = config;
const call = new ObservedCall({
...model,
serviceId: this.config.defaultServiceId,
Expand All @@ -121,7 +127,7 @@ export class Observer extends EventEmitter {

call.once('close', () => {
this._observedCalls.delete(call.callId);
this.reports.addCallEventReport(createCallEndedEventReport(
reportCallEnded && this.reports.addCallEventReport(createCallEndedEventReport(
call.serviceId,
call.roomId,
call.callId,
Expand All @@ -130,7 +136,7 @@ export class Observer extends EventEmitter {
});

this._observedCalls.set(call.callId, call);
this.reports.addCallEventReport(createCallStartedEventReport(
reportCallStarted && this.reports.addCallEventReport(createCallStartedEventReport(
call.serviceId,
call.roomId,
call.callId,
Expand Down Expand Up @@ -165,14 +171,14 @@ export class Observer extends EventEmitter {
return sfu;
}

public createCallSummaryMonitor(options?: { timeoutAfterCallClose?: number }): CallSummaryMonitor {
public createCallSummaryMonitor(options?: CallSummaryMonitorConfig & { timeoutAfterCallClose?: number }): CallSummaryMonitor {
if (this._closed) throw new Error('Cannot create a call summary monitor on a closed observer');

const existingMonitor = this._monitors.get(CallSummaryMonitor.name);

if (existingMonitor) return existingMonitor as CallSummaryMonitor;

const monitor = new CallSummaryMonitor();
const monitor = new CallSummaryMonitor(options);
const onNewCall = (call: ObservedCall) => {
monitor.addCall(call);

Expand Down
2 changes: 1 addition & 1 deletion src/common/CallEventType.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// eslint-disable-next-line no-shadow

export enum CallEventType {
CALL_STARTED = 'CALL_STARTED',
CALL_ENDED = 'CALL_ENDED',
Expand All @@ -20,4 +19,5 @@ export enum CallEventType {
DATA_CHANNEL_ERROR = 'DATA_CHANNEL_ERROR',
NEGOTIATION_NEEDED = 'NEGOTIATION_NEEDED',
SIGNALING_STATE_CHANGE = 'SIGNALING_STATE_CHANGE',
CLIENT_ISSUE = 'CLIENT_ISSUE',
}
5 changes: 5 additions & 0 deletions src/monitors/CallSummary.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { ClientIssue } from '../ObservedClient';

export interface ClientSummary {
clientId: string;
mediaUnitId: string;
userId?: string;
joined: number;
left?: number;
durationInMs: number;
avgOutboundAudioBitrate: number,
avgOutboundVideoBitrate: number,
avgInboundAudioBitrate: number,
avgInboundVideoBitrate: number,
ewmaRttInMs: number,
usedTurn: boolean;
issues: ClientIssue[];
}

export interface CallSummary {
Expand Down
80 changes: 76 additions & 4 deletions src/monitors/CallSummaryMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import { EventEmitter } from 'events';
import { CallSummary } from './CallSummary';
import { CallSummary, ClientSummary } from './CallSummary';
import { ObservedCall } from '../ObservedCall';
import { ObservedClient } from '../ObservedClient';
import { ClientIssue, ObservedClient } from '../ObservedClient';
import { ObservedOutboundTrack } from '../ObservedOutboundTrack';
import { ObservedPeerConnection } from '../ObservedPeerConnection';

export type CallSummaryMonitorEvents = {
close: [],
summary: [CallSummary],
}

export type CallSummaryMonitorConfig = {
detectUserMediaIssues?: boolean;
detectMediaTrackQualityLimitationIssues?: boolean;
}

export declare interface CallSummaryMonitor {
on<U extends keyof CallSummaryMonitorEvents>(event: U, listener: (...args: CallSummaryMonitorEvents[U]) => void): this;
off<U extends keyof CallSummaryMonitorEvents>(event: U, listener: (...args: CallSummaryMonitorEvents[U]) => void): this;
Expand All @@ -19,7 +26,9 @@ export class CallSummaryMonitor extends EventEmitter {
private readonly _summaries = new Map<string, CallSummary>();

private _closed = false;
public constructor() {
public constructor(
public readonly config: CallSummaryMonitorConfig = {},
) {
super();
}

Expand Down Expand Up @@ -50,7 +59,7 @@ export class CallSummaryMonitor extends EventEmitter {

private _addClient(callSummary: CallSummary, client: ObservedClient) {
if (this.closed) return;
const clientSummary = {
const clientSummary: ClientSummary = {
clientId: client.clientId,
mediaUnitId: client.mediaUnitId,
durationInMs: 0,
Expand All @@ -61,6 +70,8 @@ export class CallSummaryMonitor extends EventEmitter {
ewmaRttInMs: 0,
joined: client.created,
userId: client.userId,
usedTurn: false,
issues: [],
};

callSummary.clients.push(clientSummary);
Expand All @@ -71,7 +82,34 @@ export class CallSummaryMonitor extends EventEmitter {
}
};

const onUsingTurn = (usingTurn: boolean) => {
clientSummary.usedTurn ||= usingTurn;
};

const onIssue = (issue: ClientIssue) => {
clientSummary.issues.push(issue);
};

const onUserMediaError = (error: string) => {
if (!this.config.detectUserMediaIssues) return;

// maybe the client also sned this issue, in which case the timestamp can be more accurate
const alreadyDetected = clientSummary.issues.find((issue) => issue.severity === 'critical' && issue.description === error);

!alreadyDetected && clientSummary.issues.push({
severity: 'critical',
timestamp: Date.now(),
description: error,
});
};

const onNewPeerConnection = (peerConnection: ObservedPeerConnection) => this._addPeerConnection(clientSummary, peerConnection);

client.on('update', updateClient);
client.on('usingturn', onUsingTurn);
client.on('issue', onIssue);
client.on('usermediaerror', onUserMediaError);
client.on('newpeerconnection', onNewPeerConnection);
client.once('close', () => {
const elapsedTimeInS = (Date.now() - client.created) / 1000;

Expand All @@ -81,7 +119,41 @@ export class CallSummaryMonitor extends EventEmitter {
clientSummary.avgOutboundVideoBitrate = (client.totalSentBytes * 8) / elapsedTimeInS;
clientSummary.durationInMs = elapsedTimeInS * 1000;
client.off('update', updateClient);
client.off('usingturn', onUsingTurn);
client.off('issue', onIssue);
client.off('usermediaerror', onUserMediaError);
client.off('newpeerconnection', onNewPeerConnection);

callSummary.durationInMs += clientSummary.durationInMs;
});
}

private _addPeerConnection(clientSummary: ClientSummary, peerConnection: ObservedPeerConnection) {
const onOutboundVideoTrack = (track: ObservedOutboundTrack<'video'>) => this._addOutboundVideoTrack(clientSummary, track);

peerConnection.on('newoutboundvideotrack', onOutboundVideoTrack);
peerConnection.once('close', () => {
peerConnection.off('newoutboundvideotrack', onOutboundVideoTrack);
});
}

private _addOutboundVideoTrack(clientSummary: ClientSummary, track: ObservedOutboundTrack<'video'>) {
const onQualityLimitationChanged = (reason: string) => {
if (!this.config.detectMediaTrackQualityLimitationIssues) return;

clientSummary.issues.push({
severity: 'minor',
timestamp: Date.now(),
description: reason,
peerConnectionId: track.peerConnectionId,
trackId: track.trackId,
});
};

track.once('close', () => {
track.off('qualitylimitationchanged', onQualityLimitationChanged);
});
track.on('qualitylimitationchanged', onQualityLimitationChanged);
}

public takeSummary(callId: string): CallSummary | undefined {
Expand Down

0 comments on commit d34cacb

Please sign in to comment.