Skip to content

Commit

Permalink
chore(session replay): missing files for event compression
Browse files Browse the repository at this point in the history
  • Loading branch information
jxiwang committed Sep 24, 2024
1 parent df7983e commit ecde7f2
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 0 deletions.
95 changes: 95 additions & 0 deletions packages/session-replay-browser/src/events/event-compressor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import type { eventWithTime } from '@amplitude/rrweb-types';
import { SessionReplayJoinedConfig } from 'src/config/types';
import { SessionReplayEventsManager } from 'src/typings/session-replay';
import { pack } from '@amplitude/rrweb';
import { getGlobalScope } from '@amplitude/analytics-client-common';

interface TaskQueue {
event: eventWithTime;
sessionId: number;
}

export class EventCompressor {
taskQueue: TaskQueue[] = [];
isProcessing = false;
eventsManager?: SessionReplayEventsManager<'replay' | 'interaction', string>;
config: SessionReplayJoinedConfig;
deviceId: string | undefined;
canUseIdleCallback: boolean | undefined;

constructor(
eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>,
config: SessionReplayJoinedConfig,
deviceId: string | undefined,
) {
const globalScope = getGlobalScope();
this.canUseIdleCallback = globalScope && 'requestIdleCallback' in globalScope;
this.eventsManager = eventsManager;
this.config = config;
this.deviceId = deviceId;
}

// Schedule processing during idle time
public scheduleIdleProcessing(): void {
if (!this.isProcessing) {
this.isProcessing = true;
requestIdleCallback(
(idleDeadline) => {
this.processQueue(idleDeadline);
},
{ timeout: 2000 },
);
}
}

// Add an event to the task queue if idle callback is supported or compress the event directly
public enqueueEvent(event: eventWithTime, sessionId: number): void {
if (this.canUseIdleCallback && this.config.performanceConfig?.enabled) {
this.taskQueue.push({ event, sessionId });
this.scheduleIdleProcessing();
} else {
this.addCompressedEvent(event, sessionId);
}
}

// Process the task queue during idle time
public processQueue(idleDeadline: IdleDeadline): void {
// Process tasks while there's idle time or until the max number of tasks is reached
while (this.taskQueue.length > 0 && (idleDeadline.timeRemaining() > 0 || idleDeadline.didTimeout)) {
const task = this.taskQueue.shift();
if (task) {
const { event, sessionId } = task;
this.addCompressedEvent(event, sessionId);
}
}

// If there are still tasks in the queue, schedule the next idle callback
if (this.taskQueue.length > 0) {
requestIdleCallback(
(idleDeadline) => {
this.processQueue(idleDeadline);
},
{ timeout: 2000 },
);
} else {
this.isProcessing = false;
}
}

compressEvents = (event: eventWithTime) => {
const packedEvent = pack(event);
return JSON.stringify(packedEvent);
};

public addCompressedEvent = (event: eventWithTime, sessionId: number) => {
const compressedEvent = this.compressEvents(event);

if (this.eventsManager && this.deviceId) {
this.eventsManager.addEvent({
event: { type: 'replay', data: compressedEvent },
sessionId,
deviceId: this.deviceId,
});
}
};
}
160 changes: 160 additions & 0 deletions packages/session-replay-browser/test/event-compressor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { Logger } from '@amplitude/analytics-types';
import { SessionReplayLocalConfig } from '../src/config/local-config';
import { EventCompressor } from '../src/events/event-compressor';
import { createEventsManager } from '../src/events/events-manager';
import { SessionReplayEventsManager } from '../src/typings/session-replay';

const mockEvent = {
type: 4,
data: { href: 'https://analytics.amplitude.com/', width: 1728, height: 154 },
timestamp: 1687358660935,
};

type MockedLogger = jest.Mocked<Logger>;

describe('EventCompressor', () => {
let eventsManager: SessionReplayEventsManager<'replay' | 'interaction', string>;
let eventCompressor: EventCompressor;
const mockRequestIdleCallback = jest.fn((callback: (deadline: IdleDeadline) => void) => {
const mockIdleDeadline: IdleDeadline = {
timeRemaining: () => 50,
didTimeout: false,
};
return callback(mockIdleDeadline);
});
(global.requestIdleCallback as jest.Mock) = mockRequestIdleCallback;
const mockLoggerProvider: MockedLogger = {
error: jest.fn(),
log: jest.fn(),
disable: jest.fn(),
enable: jest.fn(),
warn: jest.fn(),
debug: jest.fn(),
};
const deviceId = '4abce3b0-0b1b-4b3b-8b3b-3b0b1b4b3b8b';
const sessionId = 123;
let deferEvents: typeof global.requestIdleCallback;
const config = new SessionReplayLocalConfig('static_key', {
loggerProvider: mockLoggerProvider,
sampleRate: 1,
performanceConfig: {
enabled: true,
},
});

beforeEach(async () => {
eventsManager = await createEventsManager<'replay'>({
config,
type: 'replay',
});
eventCompressor = new EventCompressor(eventsManager, config, deviceId);
deferEvents = global.requestIdleCallback;
});

afterEach(() => {
jest.resetAllMocks();
global.requestIdleCallback = deferEvents;
jest.useRealTimers();
});

test('should schedule idle processing if not already processing', () => {
const scheduleIdleProcessingMock = jest.spyOn(eventCompressor, 'scheduleIdleProcessing');
expect(eventCompressor.isProcessing).toBe(false);

eventCompressor.enqueueEvent(mockEvent, 123);

expect(scheduleIdleProcessingMock).toHaveBeenCalledTimes(1);
expect(mockRequestIdleCallback).toHaveBeenCalledTimes(1);
});

test('should not schedule idle processing if already processing', () => {
eventCompressor.isProcessing = true;
eventCompressor.scheduleIdleProcessing();
expect(mockRequestIdleCallback).not.toHaveBeenCalled();
});

test('should immediately compress and add the event if idle callback is not supported', () => {
eventCompressor.canUseIdleCallback = false;
const addEventMock = jest.spyOn(eventsManager, 'addEvent');
eventCompressor.enqueueEvent(mockEvent, sessionId);

expect(eventCompressor.taskQueue.length).toBe(0);
expect(addEventMock).toHaveBeenCalled();
});

test('should process events in the queue and add compressed events', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: true } as IdleDeadline;

const addEventMock = jest.spyOn(eventsManager, 'addEvent');

eventCompressor.processQueue(mockIdleDeadline);

expect(addEventMock).toHaveBeenCalled();
expect(eventCompressor.taskQueue.length).toBe(0);
});

test('should call requestIdleCallback if there are still tasks in the queue', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: false } as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');

eventCompressor.processQueue(mockIdleDeadline);
expect(processQueueMock).toHaveBeenCalledTimes(1);
expect(mockRequestIdleCallback).toHaveBeenCalled();
});

test('should not call requestIdleCallback if preformance config is undefined', () => {
eventCompressor.config.performanceConfig = undefined;

const addCompressedEventMock = jest.spyOn(eventCompressor, 'addCompressedEvent');

eventCompressor.enqueueEvent(mockEvent, sessionId);

expect(eventCompressor.taskQueue.length).toBe(0);
expect(addCompressedEventMock).toHaveBeenCalledWith(mockEvent, sessionId);
});

test('should set isProcessing to false when taskQueue is empty', () => {
eventCompressor.taskQueue = [];
const mockIdleDeadline = { timeRemaining: () => 0, didTimeout: false } as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');
eventCompressor.processQueue(mockIdleDeadline);

expect(processQueueMock).toHaveBeenCalled();
expect(eventCompressor.isProcessing).toBe(false);
});

test('should schedule another idle callback if there are still tasks', () => {
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });
eventCompressor.taskQueue.push({ event: mockEvent, sessionId });

const mockIdleDeadline = {
timeRemaining: () => 0,
didTimeout: false,
} as IdleDeadline;

const processQueueMock = jest.spyOn(eventCompressor, 'processQueue');
const requestIdleCallbackSpy = jest.spyOn(global, 'requestIdleCallback');

eventCompressor.processQueue(mockIdleDeadline);

expect(processQueueMock).toHaveBeenCalledTimes(1);

// Verify that requestIdleCallback is called again for the remaining tasks
expect(requestIdleCallbackSpy).toHaveBeenCalledTimes(1);

// Simulate the next recursive call by invoking the callback manually
const idleCallback = requestIdleCallbackSpy.mock.calls[0][0];
idleCallback(mockIdleDeadline);

// Ensure processQueue was called recursively
expect(processQueueMock).toHaveBeenCalledTimes(2);
});
});

0 comments on commit ecde7f2

Please sign in to comment.