Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update message handling #614

Merged
merged 6 commits into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/lib/probe-validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import config from 'config';
import TTLCache from '@isaacs/ttlcache';
import { type RedisCluster, getMeasurementRedisClient } from './redis/measurement-client.js';

export class ProbeValidator {
private readonly testIdToProbeId = new TTLCache<string, string>({
ttl: (config.get<number>('measurement.timeout') + 30) * 1000,
});

constructor (private readonly redis: RedisCluster) {}

addValidIds (measurementId: string, testId: string, probeUuid: string): void {
const key = ProbeValidator.getKey(measurementId, testId);
this.testIdToProbeId.set(key, probeUuid);
}

async validateProbe (measurementId: string, testId: string, probeUuid: string): Promise<void> {
const key = ProbeValidator.getKey(measurementId, testId);
let probeId = this.testIdToProbeId.get(key);

if (!probeId) {
probeId = await this.getProbeIdFromRedis(key);
}

if (!probeId) {
throw new Error(`Probe ID not found for key ${key}`);
} else if (probeId !== probeUuid) {
throw new Error(`Probe ID is wrong for key ${key}. Expected: ${probeId}, actual: ${probeUuid}`);
}
}

async getProbeIdFromRedis (key: string) {
return this.redis.hGet('gp:test-to-probe', key);
}

static getKey (measurementId: string, testId: string) {
return `${measurementId}_${testId}`;
}
}

let probeValidator: ProbeValidator;

export const getProbeValidator = () => {
if (!probeValidator) {
probeValidator = new ProbeValidator(getMeasurementRedisClient());
}

return probeValidator;
};
6 changes: 4 additions & 2 deletions src/lib/ws/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { getMetricsAgent } from '../metrics.js';
import { listenMeasurementRequest } from '../../measurement/handler/request.js';
import { handleMeasurementAck } from '../../measurement/handler/ack.js';
import { handleMeasurementResult } from '../../measurement/handler/result.js';
import { handleMeasurementProgress } from '../../measurement/handler/progress.js';
Expand Down Expand Up @@ -41,9 +42,10 @@ io
socket.on('probe:isIPv4Supported:update', handleIsIPv4SupportedUpdate(probe));
socket.on('probe:dns:update', handleDnsUpdate(probe));
socket.on('probe:stats:report', handleStatsReport(probe));
socket.onAnyOutgoing(listenMeasurementRequest(probe));
subscribeWithHandler(socket, 'probe:measurement:ack', handleMeasurementAck(probe));
subscribeWithHandler(socket, 'probe:measurement:progress', handleMeasurementProgress);
subscribeWithHandler(socket, 'probe:measurement:result', handleMeasurementResult);
subscribeWithHandler(socket, 'probe:measurement:progress', handleMeasurementProgress(probe));
subscribeWithHandler(socket, 'probe:measurement:result', handleMeasurementResult(probe));

socket.on('disconnect', (reason) => {
logger.debug(`Probe disconnected. (reason: ${reason}) [${socket.id}][${probe.ipAddress}]`);
Expand Down
3 changes: 1 addition & 2 deletions src/measurement/handler/ack.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Probe } from '../../probe/types.js';
import type { MeasurementAckMessage } from '../types.js';

// eslint-disable-next-line @typescript-eslint/no-unused-vars
export const handleMeasurementAck = (_probe: Probe) => async (_data: MeasurementAckMessage, ack: () => void): Promise<void> => {
export const handleMeasurementAck = (_probe: Probe) => async (_data: null, ack: () => void): Promise<void> => {
ack();
};
7 changes: 5 additions & 2 deletions src/measurement/handler/progress.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { getMeasurementRunner } from '../runner.js';
import type { Probe } from '../../probe/types.js';
import type { MeasurementProgressMessage } from '../types.js';
import { getMeasurementRunner } from '../runner.js';
import { getProbeValidator } from '../../lib/probe-validator.js';

const runner = getMeasurementRunner();

export const handleMeasurementProgress = async (data: MeasurementProgressMessage): Promise<void> => {
export const handleMeasurementProgress = (probe: Probe) => async (data: MeasurementProgressMessage): Promise<void> => {
await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid);
await runner.recordProgress(data);
};
12 changes: 12 additions & 0 deletions src/measurement/handler/request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { Probe } from '../../probe/types.js';
import { getProbeValidator } from '../../lib/probe-validator.js';
import { MeasurementRequestMessage } from '../types.js';

export const listenMeasurementRequest = (probe: Probe) => (event: string, data: unknown) => {
if (event !== 'probe:measurement:request') {
return;
}

const message = data as MeasurementRequestMessage;
getProbeValidator().addValidIds(message.measurementId, message.testId, probe.uuid);
};
5 changes: 4 additions & 1 deletion src/measurement/handler/result.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import type { Probe } from '../../probe/types.js';
import type { MeasurementResultMessage } from '../types.js';
import { getMeasurementRunner } from '../runner.js';
import { getProbeValidator } from '../../lib/probe-validator.js';

const runner = getMeasurementRunner();

export const handleMeasurementResult = async (data: MeasurementResultMessage): Promise<void> => {
export const handleMeasurementResult = (probe: Probe) => async (data: MeasurementResultMessage): Promise<void> => {
await getProbeValidator().validateProbe(data.measurementId, data.testId, probe.uuid);
await runner.recordResult(data);
};
7 changes: 4 additions & 3 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Probe } from '../probe/types.js';
import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest } from './types.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest, MeasurementRequestMessage } from './types.js';
import { rateLimit } from '../lib/rate-limiter/rate-limiter-post.js';
import type { ExtendedContext } from '../types.js';

Expand Down Expand Up @@ -64,7 +64,7 @@ export class MeasurementRunner {
const maxInProgressTests = config.get<number>('measurement.maxInProgressTests');
onlineProbesMap.forEach((probe, index) => {
const inProgressUpdates = request.inProgressUpdates && inProgressTests++ < maxInProgressTests;
this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', {
const requestMessage: MeasurementRequestMessage = {
measurementId,
testId: index.toString(),
measurement: {
Expand All @@ -73,7 +73,8 @@ export class MeasurementRunner {
target: request.target,
inProgressUpdates,
},
});
};
this.io.of(PROBES_NAMESPACE).to(probe.client).emit('probe:measurement:request', requestMessage);
});
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ export class MeasurementStore {
async createMeasurement (request: MeasurementRequest, onlineProbesMap: Map<number, Probe>, allProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);

const probesAwaitingTtl = config.get<number>('measurement.timeout') + 5;
const startTime = new Date();
const results = this.probesToResults(allProbes, request.type);

Expand All @@ -73,14 +71,17 @@ export class MeasurementStore {
results,
};
const measurementWithoutDefaults = this.removeDefaults(measurement, request);
const testsToProbes = Object.fromEntries(Array.from(onlineProbesMap, ([ testId, probe ]) => [ `${id}_${testId}`, probe.uuid ]));

await Promise.all([
this.redis.hSet('gp:in-progress', id, startTime.getTime()),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), onlineProbesMap.size, { EX: probesAwaitingTtl }),
this.redis.set(getMeasurementKey(id, 'probes_awaiting'), onlineProbesMap.size, { EX: config.get<number>('measurement.timeout') + 30 }),
this.redis.json.set(key, '$', measurementWithoutDefaults),
this.redis.json.set(getMeasurementKey(id, 'ips'), '$', allProbes.map(probe => probe.ipAddress)),
this.redis.expire(key, config.get<number>('measurement.resultTTL')),
this.redis.expire(getMeasurementKey(id, 'ips'), config.get<number>('measurement.resultTTL')),
!_.isEmpty(testsToProbes) && this.redis.hSet('gp:test-to-probe', testsToProbes),
!_.isEmpty(testsToProbes) && this.redis.hExpire('gp:test-to-probe', Object.keys(testsToProbes), config.get<number>('measurement.timeout') + 30),
]);

return id;
Expand Down
12 changes: 9 additions & 3 deletions src/measurement/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,16 @@ export type MeasurementRecord = {
/**
* Probe Messages
*/
export type MeasurementAckMessage = {
id: string;

export type MeasurementRequestMessage = {
testId: string;
measurementId: string;
};
measurement: MeasurementOptions & {
type: MeasurementRequest['type'];
target: MeasurementRequest['target'];
inProgressUpdates: MeasurementRequest['inProgressUpdates'];
}
}

export type MeasurementProgressMessage = {
testId: string;
Expand Down
2 changes: 0 additions & 2 deletions test/tests/integration/measurement/create-measurement.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,6 @@ describe('Create measurement', () => {
.send({
type: 'ping',
target: 'example.com',
limit: 2,
locations: [{
continent: 'NA',
}],
Expand Down Expand Up @@ -666,7 +665,6 @@ describe('Create measurement', () => {
await requestAgent.get(`/v1/measurements/${id2}`)
.expect(200)
.expect((response) => {
expect(response.body.limit).to.equal(2);
expect(response.body.locations).to.deep.equal([{ continent: 'NA' }]);
expect(response.body.results[0].result.status).to.equal('offline');
expect(response.body.results[0].result.rawOutput).to.equal('This probe is currently offline. Please try again later.');
Expand Down
29 changes: 26 additions & 3 deletions test/tests/unit/measurement/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { PingResult } from '../../../../src/measurement/types.js';

const getProbe = (id: string, ip: string) => ({
ipAddress: ip,
uuid: `${id}-${id}-${id}-${id}-${id}`,
altIpAddresses: [],
location: {
network: id,
Expand Down Expand Up @@ -37,6 +38,7 @@ describe('measurement store', () => {
hScan: sandbox.stub(),
hDel: sandbox.stub(),
hSet: sandbox.stub(),
hExpire: sandbox.stub(),
set: sandbox.stub(),
expire: sandbox.stub(),
del: sandbox.stub(),
Expand Down Expand Up @@ -149,10 +151,11 @@ describe('measurement store', () => {
[ getProbe('z', '1.1.1.1'), getProbe('10', '2.2.2.2'), getProbe('x', '3.3.3.3'), getProbe('0', '4.4.4.4') ],
);

expect(redisMock.hSet.callCount).to.equal(1);
expect(redisMock.hSet.callCount).to.equal(2);

expect(redisMock.hSet.args[0]).to.deep.equal([ 'gp:in-progress', 'measurementid', now ]);
expect(redisMock.set.callCount).to.equal(1);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 4, { EX: 35 }]);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 4, { EX: 60 }]);
expect(redisMock.json.set.callCount).to.equal(2);

expect(redisMock.json.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:results', '$', {
Expand Down Expand Up @@ -235,6 +238,26 @@ describe('measurement store', () => {
expect(redisMock.json.set.args[1]).to.deep.equal([ 'gp:m:{measurementid}:ips', '$', [ '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4' ] ]);

expect(redisMock.expire.args[1]).to.deep.equal([ 'gp:m:{measurementid}:ips', 604800 ]);

expect(redisMock.hSet.args[1]).to.deep.equal([ 'gp:test-to-probe', {
measurementid_0: 'z-z-z-z-z',
measurementid_1: '10-10-10-10-10',
measurementid_2: 'x-x-x-x-x',
measurementid_3: '0-0-0-0-0',
}]);

expect(redisMock.hExpire.callCount).to.equal(1);

expect(redisMock.hExpire.args[0]).to.deep.equal([
'gp:test-to-probe',
[
'measurementid_0',
'measurementid_1',
'measurementid_2',
'measurementid_3',
],
60,
]);
});

it('should initialize measurement object with the proper default values', async () => {
Expand Down Expand Up @@ -400,7 +423,7 @@ describe('measurement store', () => {
},
]);

expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 0, { EX: 35 }]);
expect(redisMock.set.args[0]).to.deep.equal([ 'gp:m:{measurementid}:probes_awaiting', 0, { EX: 60 }]);
});

it('should store non-default fields of the measurement request', async () => {
Expand Down
41 changes: 41 additions & 0 deletions test/tests/unit/probe-validator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { ProbeValidator } from '../../../src/lib/probe-validator.js';
import { RedisCluster } from '../../../src/lib/redis/shared.js';

describe('ProbeValidator', () => {
const sandbox = sinon.createSandbox();
const redis = { hGet: sandbox.stub() };
const probeValidator = new ProbeValidator(redis as unknown as RedisCluster);

beforeEach(() => {
redis.hGet.resolves(undefined);
});

it('should pass through valid probe id', async () => {
probeValidator.addValidIds('measurement-id', 'test-id', 'probe-uuid');
await probeValidator.validateProbe('measurement-id', 'test-id', 'probe-uuid');
});

it('should throw for invalid probe id', async () => {
probeValidator.addValidIds('measurement-id', 'test-id', 'probe-uuid');
const error = await probeValidator.validateProbe('measurement-id', 'test-id', 'invalid-probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID is wrong for key measurement-id_test-id. Expected: probe-uuid, actual: invalid-probe-uuid');
});

it('should throw for missing key', async () => {
const error = await probeValidator.validateProbe('missing-measurement-id', 'test-id', 'probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID not found for key missing-measurement-id_test-id');
});

it('should search key in redis if not found locally', async () => {
redis.hGet.resolves('probe-uuid');
await probeValidator.validateProbe('only-redis-measurement-id', 'test-id', 'probe-uuid');
});

it('should throw if redis probe id is different', async () => {
redis.hGet.resolves('different-probe-uuid');
const error = await probeValidator.validateProbe('only-redis-measurement-id', 'test-id', 'probe-uuid').catch(err => err);
expect(error.message).to.equal('Probe ID is wrong for key only-redis-measurement-id_test-id. Expected: different-probe-uuid, actual: probe-uuid');
});
});