Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(session replay): delay compresion #885

Merged
merged 10 commits into from
Sep 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class SessionReplayPlugin implements EnrichmentPlugin {
configEndpointUrl: this.options.configEndpointUrl,
shouldInlineStylesheet: this.options.shouldInlineStylesheet,
version: { type: 'plugin', version: VERSION },
performanceConfig: this.options.performanceConfig,
}).promise;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ export interface SessionReplayPrivacyConfig {
maskSelector?: string[];
unmaskSelector?: string[];
}

export interface SessionReplayPerformanceConfig {
enabled: boolean;
timeout?: number;
}

export interface SessionReplayOptions {
sampleRate?: number;
privacyConfig?: SessionReplayPrivacyConfig;
debugMode?: boolean;
forceSessionTracking?: boolean;
configEndpointUrl?: string;
shouldInlineStylesheet?: boolean;
performanceConfig?: SessionReplayPerformanceConfig;
}
3 changes: 3 additions & 0 deletions packages/session-replay-browser/src/config/local-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
SessionReplayLocalConfig as ISessionReplayLocalConfig,
InteractionConfig,
PrivacyConfig,
SessionReplayPerformanceConfig,
SessionReplayVersion,
} from './types';

Expand All @@ -26,6 +27,7 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo
configEndpointUrl?: string;
shouldInlineStylesheet?: boolean;
version?: SessionReplayVersion;
performanceConfig?: SessionReplayPerformanceConfig;

constructor(apiKey: string, options: SessionReplayOptions) {
const defaultConfig = getDefaultConfig();
Expand All @@ -45,6 +47,7 @@ export class SessionReplayLocalConfig extends Config implements ISessionReplayLo
this.configEndpointUrl = options.configEndpointUrl;
this.shouldInlineStylesheet = options.shouldInlineStylesheet;
this.version = options.version;
this.performanceConfig = options.performanceConfig;

if (options.privacyConfig) {
this.privacyConfig = options.privacyConfig;
Expand Down
6 changes: 6 additions & 0 deletions packages/session-replay-browser/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export interface SessionReplayLocalConfig extends Config {
configEndpointUrl?: string;
shouldInlineStylesheet?: boolean;
version?: SessionReplayVersion;
performanceConfig?: SessionReplayPerformanceConfig;
}

export interface SessionReplayJoinedConfig extends SessionReplayLocalConfig {
Expand All @@ -72,4 +73,9 @@ export interface SessionReplayVersion {
type: SessionReplayType;
}

export interface SessionReplayPerformanceConfig {
enabled: boolean;
timeout?: number;
}

export type SessionReplayType = 'standalone' | 'plugin' | 'segment';
100 changes: 100 additions & 0 deletions packages/session-replay-browser/src/events/event-compressor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import type { eventWithTime } from '@amplitude/rrweb-types';
import { SessionReplayJoinedConfig } from 'src/config/types';
import { SessionReplayEventsManager } from 'src/typings/session-replay';
import { pack } from '@amplitude/rrweb';
import { getGlobalScope } from '@amplitude/analytics-client-common';

interface TaskQueue {
event: eventWithTime;
sessionId: number;
}

const DEFAULT_TIMEOUT = 2000;
export class EventCompressor {
taskQueue: TaskQueue[] = [];
isProcessing = false;
eventsManager?: SessionReplayEventsManager<'replay' | 'interaction', string>;
config: SessionReplayJoinedConfig;
deviceId: string | undefined;
canUseIdleCallback: boolean | undefined;
jxiwang marked this conversation as resolved.
Show resolved Hide resolved
timeout: number;

constructor(
eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>,
config: SessionReplayJoinedConfig,
deviceId: string | undefined,
) {
const globalScope = getGlobalScope();
this.canUseIdleCallback = globalScope && 'requestIdleCallback' in globalScope;
this.eventsManager = eventsManager;
this.config = config;
this.deviceId = deviceId;
this.timeout = config.performanceConfig?.timeout || DEFAULT_TIMEOUT;
}

// Schedule processing during idle time
public scheduleIdleProcessing(): void {
if (!this.isProcessing) {
this.isProcessing = true;
requestIdleCallback(
(idleDeadline) => {
this.processQueue(idleDeadline);
},
{ timeout: this.timeout },
);
}
}

// Add an event to the task queue if idle callback is supported or compress the event directly
public enqueueEvent(event: eventWithTime, sessionId: number): void {
if (this.canUseIdleCallback && this.config.performanceConfig?.enabled) {
this.config.loggerProvider.debug('Enqueuing event for processing during idle time.');
this.taskQueue.push({ event, sessionId });
this.scheduleIdleProcessing();
} else {
this.config.loggerProvider.debug('Processing event without idle callback.');
this.addCompressedEvent(event, sessionId);
}
}

// Process the task queue during idle time
public processQueue(idleDeadline: IdleDeadline): void {
// Process tasks while there's idle time or until the max number of tasks is reached
while (this.taskQueue.length > 0 && (idleDeadline.timeRemaining() > 0 || idleDeadline.didTimeout)) {
const task = this.taskQueue.shift();
if (task) {
const { event, sessionId } = task;
this.addCompressedEvent(event, sessionId);
}
}

// If there are still tasks in the queue, schedule the next idle callback
if (this.taskQueue.length > 0) {
requestIdleCallback(
(idleDeadline) => {
this.processQueue(idleDeadline);
},
{ timeout: this.timeout },
);
} else {
this.isProcessing = false;
}
}

compressEvent = (event: eventWithTime) => {
const packedEvent = pack(event);
return JSON.stringify(packedEvent);
};

public addCompressedEvent = (event: eventWithTime, sessionId: number) => {
const compressedEvent = this.compressEvent(event);

if (this.eventsManager && this.deviceId) {
this.eventsManager.addEvent({
event: { type: 'replay', data: compressedEvent },
sessionId,
deviceId: this.deviceId,
});
}
};
}
17 changes: 9 additions & 8 deletions packages/session-replay-browser/src/session-replay.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getAnalyticsConnector, getGlobalScope } from '@amplitude/analytics-client-common';
import { Logger, returnWrapper } from '@amplitude/analytics-core';
import { Logger as ILogger, LogLevel } from '@amplitude/analytics-types';
import { pack, record } from '@amplitude/rrweb';
import { record } from '@amplitude/rrweb';
import { scrollCallback } from '@amplitude/rrweb-types';
import { createSessionReplayJoinedConfigGenerator } from './config/joined-config';
import { SessionReplayJoinedConfig, SessionReplayJoinedConfigGenerator } from './config/types';
Expand Down Expand Up @@ -30,6 +30,7 @@ import {
SessionReplayOptions,
} from './typings/session-replay';
import { VERSION } from './version';
import { EventCompressor } from './events/event-compressor';

type PageLeaveFn = (e: PageTransitionEvent | Event) => void;

Expand All @@ -42,6 +43,7 @@ export class SessionReplay implements AmplitudeSessionReplay {
loggerProvider: ILogger;
recordCancelCallback: ReturnType<typeof record> | null = null;
eventCount = 0;
eventCompressor: EventCompressor | undefined;

// Visible for testing
pageLeaveFns: PageLeaveFn[] = [];
Expand Down Expand Up @@ -129,6 +131,7 @@ export class SessionReplay implements AmplitudeSessionReplay {
}

this.eventsManager = new MultiEventManager<'replay' | 'interaction', string>(...managers);
this.eventCompressor = new EventCompressor(this.eventsManager, this.config, this.getDeviceId());

this.loggerProvider.log('Installing @amplitude/session-replay-browser.');

Expand Down Expand Up @@ -317,7 +320,6 @@ export class SessionReplay implements AmplitudeSessionReplay {
}
this.stopRecordingEvents();
const privacyConfig = this.config.privacyConfig;

this.loggerProvider.log('Session Replay capture beginning.');
this.recordCancelCallback = record({
emit: (event) => {
Expand All @@ -327,13 +329,12 @@ export class SessionReplay implements AmplitudeSessionReplay {
this.sendEvents();
return;
}
const eventString = JSON.stringify(event);
const deviceId = this.getDeviceId();
this.eventsManager &&
deviceId &&
this.eventsManager.addEvent({ event: { type: 'replay', data: eventString }, sessionId, deviceId });

if (this.eventCompressor) {
// Schedule processing during idle time if the browser supports requestIdleCallback
this.eventCompressor.enqueueEvent(event, sessionId);
}
},
packFn: pack,
inlineStylesheet: this.config.shouldInlineStylesheet,
hooks: {
mouseInteraction:
Expand Down
161 changes: 161 additions & 0 deletions packages/session-replay-browser/test/event-compressor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { Logger } from '@amplitude/analytics-types';
import { SessionReplayLocalConfig } from '../src/config/local-config';
import { EventCompressor } from '../src/events/event-compressor';
import { createEventsManager } from '../src/events/events-manager';
import { SessionReplayEventsManager } from '../src/typings/session-replay';

const mockEvent = {
type: 4,
data: { href: 'https://analytics.amplitude.com/', width: 1728, height: 154 },
timestamp: 1687358660935,
};

type MockedLogger = jest.Mocked<Logger>;

describe('EventCompressor', () => {
let eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>;
let eventCompressor: EventCompressor;
const mockRequestIdleCallback = jest.fn((callback: (deadline: IdleDeadline) => void) => {
const mockIdleDeadline: IdleDeadline = {
timeRemaining: () => 50,
didTimeout: false,
};
return callback(mockIdleDeadline);
});
(global.requestIdleCallback as jest.Mock) = mockRequestIdleCallback;
const mockLoggerProvider: MockedLogger = {
error: jest.fn(),
log: jest.fn(),
disable: jest.fn(),
enable: jest.fn(),
warn: jest.fn(),
debug: jest.fn(),
};
const deviceId = '4abce3b0-0b1b-4b3b-8b3b-3b0b1b4b3b8b';
const sessionId = 123;
let deferEvents: typeof global.requestIdleCallback;
const config = new SessionReplayLocalConfig('static_key', {
loggerProvider: mockLoggerProvider,
sampleRate: 1,
performanceConfig: {
enabled: true,
timeout: 2000,
},
});

beforeEach(async () => {
eventsManager = await createEventsManager<'replay'>({
config,
type: 'replay',
});
eventCompressor = new EventCompressor(eventsManager, config, deviceId);
deferEvents = global.requestIdleCallback;
});

afterEach(() => {
jest.resetAllMocks();
global.requestIdleCallback = deferEvents;
jest.useRealTimers();
});

test('should schedule idle processing if not already processing', () => {
const scheduleIdleProcessingMock = jest.spyOn(eventCompressor, 'scheduleIdleProcessing');
expect(eventCompressor.isProcessing).toBe(false);

eventCompressor.enqueueEvent(mockEvent, 123);

expect(scheduleIdleProcessingMock).toHaveBeenCalledTimes(1);
expect(mockRequestIdleCallback).toHaveBeenCalledTimes(1);
});

test('should not schedule idle processing if already processing', () => {
eventCompressor.isProcessing = true;
eventCompressor.scheduleIdleProcessing();
expect(mockRequestIdleCallback).not.toHaveBeenCalled();
});

test('should immediately compress and add the event if idle callback is not supported', () => {
eventCompressor.canUseIdleCallback = false;
const addEventMock = jest.spyOn(eventsManager, 'addEvent');
eventCompressor.enqueueEvent(mockEvent, sessionId);

expect(eventCompressor.taskQueue.length).toBe(0);
expect(addEventMock).toHaveBeenCalled();
});

test('should process events in the queue and add compressed events', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: true } as IdleDeadline;

const addEventMock = jest.spyOn(eventsManager, 'addEvent');

eventCompressor.processQueue(mockIdleDeadline);

expect(addEventMock).toHaveBeenCalled();
expect(eventCompressor.taskQueue.length).toBe(0);
});

test('should call requestIdleCallback if there are still tasks in the queue', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: false } as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');

eventCompressor.processQueue(mockIdleDeadline);
expect(processQueueMock).toHaveBeenCalledTimes(1);
expect(mockRequestIdleCallback).toHaveBeenCalled();
});

test('should not call requestIdleCallback if preformance config is undefined', () => {
eventCompressor.config.performanceConfig = undefined;

const addCompressedEventMock = jest.spyOn(eventCompressor, 'addCompressedEvent');

eventCompressor.enqueueEvent(mockEvent, sessionId);

expect(eventCompressor.taskQueue.length).toBe(0);
expect(addCompressedEventMock).toHaveBeenCalledWith(mockEvent, sessionId);
});

test('should set isProcessing to false when taskQueue is empty', () => {
eventCompressor.taskQueue = [];
const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: false } as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');
eventCompressor.processQueue(mockIdleDeadline);

expect(processQueueMock).toHaveBeenCalled();
expect(eventCompressor.isProcessing).toBe(false);
});

test('should schedule another idle callback if there are still tasks', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = {
timeRemaining: () => 0,
didTimeout: false,
} as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');
const requestIdleCallbackSpy = jest.spyOn(global, 'requestIdleCallback');

eventCompressor.processQueue(mockIdleDeadline);

expect(processQueueMock).toHaveBeenCalledTimes(1);

// Verify that requestIdleCallback is called again for the remaining tasks
expect(requestIdleCallbackSpy).toHaveBeenCalledTimes(1);

// Simulate the next recursive call by invoking the callback manually
const idleCallback = requestIdleCallbackSpy.mock.calls[0][0];
idleCallback(mockIdleDeadline);

// Ensure processQueue was called recursively
expect(processQueueMock).toHaveBeenCalledTimes(2);
});
});
Loading
Loading