From 4705bc95c9e890fe3825f6e552894b748305f057 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 27 May 2024 17:43:54 +1000 Subject: [PATCH] feat: adding `auditEventMultiPathGet` handler to support getting audit events while selecting multiple paths [ci skip] --- src/audit/utils.ts | 108 +++++ src/client/callers/auditEventsMultiPathGet.ts | 12 + src/client/callers/index.ts | 2 + .../handlers/AuditEventsMultiPathGet.ts | 117 ++++++ src/client/handlers/index.ts | 9 +- tests/audit/utils.test.ts | 100 +++++ tests/client/handlers/audit.test.ts | 376 +++++++++++++++++- tests/git/http.test.ts | 2 +- tests/git/utils.test.ts | 2 +- 9 files changed, 704 insertions(+), 24 deletions(-) create mode 100644 src/client/callers/auditEventsMultiPathGet.ts create mode 100644 src/client/handlers/AuditEventsMultiPathGet.ts create mode 100644 tests/audit/utils.test.ts diff --git a/src/audit/utils.ts b/src/audit/utils.ts index 378a2a7396..b24c9b59bd 100644 --- a/src/audit/utils.ts +++ b/src/audit/utils.ts @@ -11,6 +11,7 @@ import type { import type * as nodesEvents from '../nodes/events'; import type * as discoveryEvents from '../discovery/events'; import type { AuditEventId } from '../ids'; +import type { TopicPath } from './types'; import { IdInternal } from '@matrixai/id'; import * as sortableIdUtils from '@matrixai/id/dist/IdSortable'; import * as nodesUtils from '../nodes/utils'; @@ -187,6 +188,34 @@ const topicPaths = [ discoveryCheckRediscoveryTopicPath, ] as const; +// @ts-ignore: recursive definition for defining a tree +type TopicPathTreeNode = Record; + +function generateTopicPathTree() { + const tree: TopicPathTreeNode = {}; + for (const topicPath of topicPaths) { + let node: TopicPathTreeNode = tree; + for (const topicPathElement of topicPath) { + if (node[topicPathElement] == null) node[topicPathElement] = {}; + node = node[topicPathElement]; + } + } + return tree; +} + +const topicPathTree = generateTopicPathTree(); + +function isTopicPath(it: unknown): it is TopicPath { + if (!Array.isArray(it)) return false; + let node = topicPathTree; + for (const pathElement of it) { + if (typeof pathElement !== 'string') return false; + if (node[pathElement] == null) return false; + node = node[pathElement]; + } + return true; +} + // Metrics // Nodes @@ -211,6 +240,81 @@ const metricPaths = [ nodeConnectionOutboundMetricPath, ] as const; +/** + * Will take an array of dot path sorted paths and return the minimal list common paths. + * So sub-paths will be filtered out if we already contain a parent path E.G. `a.b` will be removed if we also include `a`. + * Duplicate paths will be removed, so `a` will be removed if two `a`'s exist. + */ +function filterSubPaths(paths: Array): Array { + let previous: string = ''; + return paths.sort().filter((value, index) => { + // Checking if the current value is included within the previous + if (index === 0 || !value.startsWith(previous)) { + previous = value; + return true; + } + return false; + }, {}); +} + +/** + * This takes N generators that yield data in a sorted order and combines their outputs in a fully sorted order. + * This will only work on pre-sorted outputs from the generator. + */ +async function* genSort( + sortFn: (a: T, b: T) => number, + ...gens: Array> +): AsyncGenerator { + const heads: Array<{ + value: T; + gen: AsyncGenerator; + index: number; + }> = []; + // Seed the heads + let i = 0; + for (const gen of gens) { + const head = await gen.next(); + if (!head.done) { + heads.push({ + value: head.value, + gen, + index: i++, + }); + } + } + if (heads.length === 0) return; + + // Yield from heads until all iterators are done + let first = true; + let previous: T; + try { + while (true) { + // Sort them in order by the sortFn + heads.sort(({ value: a }, { value: b }) => sortFn(a, b)); + // Yield the first in the order + const head = heads[0]; + // Skip any duplicates + if (first || sortFn(previous!, head.value) !== 0) yield head.value; + first = false; + previous = head.value; + // Get the new head for that generator + const next = await head.gen.next(); + // If the generator is done then we remove it from the heads, otherwise update the head value + if (next.done) { + heads.shift(); + } else { + head.value = next.value; + } + // If the last head is done then we break + if (heads.length === 0) return; + } + } finally { + for (const { gen } of heads) { + await gen.return(); + } + } +} + export { extractFromSeek, createAuditEventIdGenerator, @@ -234,8 +338,12 @@ export { fromEventDiscoveryCheckRediscovery, nodeGraphTopicPath, topicPaths, + topicPathTree, + isTopicPath, nodeConnectionMetricPath, nodeConnectionInboundMetricPath, nodeConnectionOutboundMetricPath, metricPaths, + filterSubPaths, + genSort, }; diff --git a/src/client/callers/auditEventsMultiPathGet.ts b/src/client/callers/auditEventsMultiPathGet.ts new file mode 100644 index 0000000000..6a09eebf05 --- /dev/null +++ b/src/client/callers/auditEventsMultiPathGet.ts @@ -0,0 +1,12 @@ +import type { HandlerTypes } from '@matrixai/rpc'; +import type AuditEventsMultiPathGet from '../handlers/AuditEventsMultiPathGet'; +import { ServerCaller } from '@matrixai/rpc'; + +type CallerTypes = HandlerTypes; + +const auditEventsMultiPathGet = new ServerCaller< + CallerTypes['input'], + CallerTypes['output'] +>(); + +export default auditEventsMultiPathGet; diff --git a/src/client/callers/index.ts b/src/client/callers/index.ts index d89931efdc..39ac0d5678 100644 --- a/src/client/callers/index.ts +++ b/src/client/callers/index.ts @@ -3,6 +3,7 @@ import agentStatus from './agentStatus'; import agentStop from './agentStop'; import agentUnlock from './agentUnlock'; import auditEventsGet from './auditEventsGet'; +import auditEventsMultiPathGet from './auditEventsMultiPathGet'; import auditMetricGet from './auditMetricGet'; import gestaltsActionsGetByIdentity from './gestaltsActionsGetByIdentity'; import gestaltsActionsGetByNode from './gestaltsActionsGetByNode'; @@ -84,6 +85,7 @@ const clientManifest = { agentStop, agentUnlock, auditEventsGet, + auditEventsMultiPathGet, auditMetricGet, gestaltsActionsGetByIdentity, gestaltsActionsGetByNode, diff --git a/src/client/handlers/AuditEventsMultiPathGet.ts b/src/client/handlers/AuditEventsMultiPathGet.ts new file mode 100644 index 0000000000..3ab6b9b37e --- /dev/null +++ b/src/client/handlers/AuditEventsMultiPathGet.ts @@ -0,0 +1,117 @@ +import type { ContextTimed } from '@matrixai/contexts'; +import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types'; +import type { + AuditEvent, + AuditEventSerialized, + AuditEventToAuditEventSerialized, + TopicPath, +} from '../../audit/types'; +import type { Audit } from '../../audit'; +import type { AuditEventId, AuditEventIdEncoded } from '../../ids'; +import { ServerHandler } from '@matrixai/rpc'; +import * as auditUtils from '../../audit/utils'; + +class AuditEventsGet extends ServerHandler< + { + audit: Audit; + }, + ClientRPCRequestParams<{ + paths: Array; + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; + order?: 'asc' | 'desc'; + limit?: number; + awaitFutureEvents?: boolean; + }>, + ClientRPCResponseResult +> { + public async *handle( + { + paths, + seek, + seekEnd, + order = 'asc', + limit, + awaitFutureEvents = false, + }: ClientRPCRequestParams<{ + seek?: AuditEventIdEncoded | number; + seekEnd?: AuditEventIdEncoded | number; + order?: 'asc' | 'desc'; + limit?: number; + awaitFutureEvents?: boolean; + }> & { + paths: Array; + }, + _cancel, + _meta, + ctx: ContextTimed, + ): AsyncGenerator< + ClientRPCResponseResult> + > { + const { audit } = this.container; + const iterators: Array> = []; + let seek_: AuditEventId | number | undefined; + if (seek != null) { + seek_ = + typeof seek === 'string' ? auditUtils.decodeAuditEventId(seek) : seek; + } + let seekEnd_: AuditEventId | number | undefined; + if (seekEnd != null) { + seekEnd_ = + typeof seekEnd === 'string' + ? auditUtils.decodeAuditEventId(seekEnd) + : seekEnd; + } + + // Convert the paths + const topicPaths: Array = []; + for (const filteredPath of auditUtils.filterSubPaths(paths)) { + const topicPath = filteredPath.split('.'); + if (auditUtils.isTopicPath(topicPath)) topicPaths.push(topicPath); + } + + // If the call is descending chronologically, or does not want to await future events, + // it should not await future events. + for (const topicPath of topicPaths) { + if (!awaitFutureEvents) { + const iterator = audit.getAuditEvents(topicPath, { + seek: seek_, + seekEnd: seekEnd_, + order: order, + limit: limit, + }); + iterators.push(iterator); + } else { + const iterator = audit.getAuditEventsLongRunning(topicPath, { + seek: seek_, + seekEnd: seekEnd_, + limit: limit, + }); + iterators.push(iterator); + } + } + + // We need to reverse the compare if we are descending in time + const orderSwitchMultiplier = awaitFutureEvents || order === 'asc' ? 1 : -1; + function sortFn(a: AuditEvent, b: AuditEvent) { + return Buffer.compare(a.id, b.id) * orderSwitchMultiplier; + } + + const combinedIterator = auditUtils.genSort( + sortFn, + ...iterators, + ); + ctx.signal.addEventListener('abort', async () => { + await combinedIterator.return(ctx.signal.reason); + }); + for await (const auditEvent of combinedIterator) { + yield { + id: auditUtils.encodeAuditEventId(auditEvent.id), + path: auditEvent.path, + data: auditEvent.data, + }; + } + } +} + +export default AuditEventsGet; diff --git a/src/client/handlers/index.ts b/src/client/handlers/index.ts index 795eba93ee..c1b20b871b 100644 --- a/src/client/handlers/index.ts +++ b/src/client/handlers/index.ts @@ -19,6 +19,9 @@ import AgentLockAll from './AgentLockAll'; import AgentStatus from './AgentStatus'; import AgentStop from './AgentStop'; import AgentUnlock from './AgentUnlock'; +import AuditEventsGet from './AuditEventsGet'; +import AuditEventsMultiPathGet from './AuditEventsMultiPathGet'; +import AuditMetricGet from './AuditMetricGet'; import GestaltsActionsGetByIdentity from './GestaltsActionsGetByIdentity'; import GestaltsActionsGetByNode from './GestaltsActionsGetByNode'; import GestaltsActionsSetByIdentity from './GestaltsActionsSetByIdentity'; @@ -89,8 +92,6 @@ import VaultsSecretsNewDir from './VaultsSecretsNewDir'; import VaultsSecretsRename from './VaultsSecretsRename'; import VaultsSecretsStat from './VaultsSecretsStat'; import VaultsVersion from './VaultsVersion'; -import AuditEventsGet from './AuditEventsGet'; -import AuditMetricGet from './AuditMetricGet'; /** * Server manifest factory. @@ -120,6 +121,7 @@ const serverManifest = (container: { agentStop: new AgentStop(container), agentUnlock: new AgentUnlock(container), auditEventsGet: new AuditEventsGet(container), + auditEventsMultiPathGet: new AuditEventsMultiPathGet(container), auditMetricGet: new AuditMetricGet(container), gestaltsActionsGetByIdentity: new GestaltsActionsGetByIdentity(container), gestaltsActionsGetByNode: new GestaltsActionsGetByNode(container), @@ -205,6 +207,9 @@ export { AgentStatus, AgentStop, AgentUnlock, + AuditEventsGet, + AuditEventsMultiPathGet, + AuditMetricGet, GestaltsActionsGetByIdentity, GestaltsActionsGetByNode, GestaltsActionsSetByIdentity, diff --git a/tests/audit/utils.test.ts b/tests/audit/utils.test.ts new file mode 100644 index 0000000000..4b19182fc0 --- /dev/null +++ b/tests/audit/utils.test.ts @@ -0,0 +1,100 @@ +import fc from 'fast-check'; +import { test } from '@fast-check/jest'; +import * as auditUtils from '@/audit/utils'; + +describe('Audit Utils', () => { + const sortFn = (a: number, b: number): number => { + if (a < b) return -1; + if (a > b) return 1; + return 0; + }; + const orderedNumberArrayArb = fc + .array(fc.integer({ min: 0, max: 100 })) + .map((array) => { + array.sort(sortFn); + return array; + }); + + /** + * Checks if the array is strictly ordered without duplicate numbers + */ + function expectSortedArray(data: Array) { + let previous: number | undefined; + for (const datum of data) { + if (previous == null) { + previous = datum; + continue; + } + if (!(previous <= datum)) { + throw Error(`${previous} was not less than ${datum} in ${data}`); + } + if (previous === datum) { + throw Error(`there should be no duplicate numbers`); + } + } + } + + test('filterSubPaths', async () => { + // Out of theses only `a.b`, `e` and `f` are top level parents + const data = [ + 'a.b.c', + 'a.b.c', + 'a.b.e', + 'e.f', + 'e.g', + 'a.b', + 'e', + 'f', + 'f', + ]; + const filtered = auditUtils.filterSubPaths(data); + expect(filtered).toHaveLength(3); + expect(filtered).toInclude('a.b'); + expect(filtered).toInclude('e'); + expect(filtered).toInclude('f'); + expect(filtered).not.toInclude('a.b.c'); + expect(filtered).not.toInclude('a.b.c'); + expect(filtered).not.toInclude('a.b.e'); + expect(filtered).not.toInclude('e.f'); + expect(filtered).not.toInclude('e.g'); + }); + test.prop([fc.array(orderedNumberArrayArb).noShrink()])( + 'can combine strictly ordered iterators', + async (generatorData) => { + async function* gen( + data: Array, + ): AsyncGenerator { + for (const datum of data) { + yield datum; + } + } + + const expectedData: Set = new Set(); + for (const data of generatorData) { + for (const datum of data) { + expectedData.add(datum); + } + } + const expectedDataArray = [...expectedData]; + expectedDataArray.sort(sortFn); + + const gens = generatorData.map((data) => gen(data)); + const sortedGen = auditUtils.genSort(sortFn, ...gens); + const acc: Array = []; + for await (const value of sortedGen) { + acc.push(value); + } + expectSortedArray(acc); + expect(acc).toMatchObject(expectedDataArray); + }, + ); + test('isAuditPath', async () => { + for (const topicPath of auditUtils.topicPaths) { + expect(auditUtils.isTopicPath(topicPath)).toBeTrue(); + // Parent paths are also valid + expect(auditUtils.isTopicPath(topicPath.slice(0, 2))).toBeTrue(); + expect(auditUtils.isTopicPath(topicPath.slice(0, 1))).toBeTrue(); + } + expect(auditUtils.isTopicPath(['invalid', 'invalid'])).toBeFalse(); + }); +}); diff --git a/tests/client/handlers/audit.test.ts b/tests/client/handlers/audit.test.ts index 4019bd5730..0539b1d3be 100644 --- a/tests/client/handlers/audit.test.ts +++ b/tests/client/handlers/audit.test.ts @@ -20,8 +20,10 @@ import * as discoveryEvents from '@/discovery/events'; import * as networkUtils from '@/network/utils'; import { Audit } from '@/audit'; import AuditMetricGet from '@/client/handlers/AuditMetricGet'; +import AuditEventsMultiPathGet from '@/client/handlers/AuditEventsMultiPathGet'; import AuditEventsGet from '@/client/handlers/AuditEventsGet'; import auditMetricGet from '@/client/callers/auditMetricGet'; +import auditEventsMultiPathGet from '@/client/callers/auditEventsMultiPathGet'; import auditEventsGet from '@/client/callers/auditEventsGet'; import * as testsUtils from '../../utils'; import * as testNodesUtils from '../../nodes/utils'; @@ -49,6 +51,12 @@ describe('auditEventGet', () => { let nodeConnectionManager: NodeConnectionManager; // Event target pretending to be discovery let discovery: Discovery; // Event target pretending to be discovery + const handleEvent = async (evt) => { + // @ts-ignore: kidnap protected handlerMap so we can send events in the foreground + const handlerMap = audit.eventHandlerMap; + await handlerMap.get(evt.constructor)?.handler(evt); + }; + beforeEach(async () => { dataDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'polykey-test-'), @@ -103,6 +111,7 @@ describe('auditEventGet', () => { rpcClient = new RPCClient({ manifest: { auditEventsGet, + auditEventsMultiPathGet, }, streamFactory: () => webSocketClient.connection.newStream(), toError: networkUtils.toError, @@ -119,6 +128,7 @@ describe('auditEventGet', () => { recursive: true, }); }); + test('cancels', async () => { let callerInterface = await rpcClient.methods.auditEventsGet({ path: [], @@ -145,15 +155,11 @@ describe('auditEventGet', () => { ...eventDetail, remoteNodeId: nodesUtils.encodeNodeId(eventDetail.remoteNodeId), }; - // @ts-ignore: kidnap protected - const handlerMap = audit.eventHandlerMap; - await handlerMap - .get(nodesEvents.EventNodeConnectionManagerConnectionReverse) - ?.handler( - new nodesEvents.EventNodeConnectionManagerConnectionReverse({ - detail: eventDetail, - }), - ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); let callerInterface: any = await rpcClient.methods.auditEventsGet({ path: ['node', 'connection', 'reverse'], }); @@ -171,13 +177,11 @@ describe('auditEventGet', () => { ...auditEventData, type: 'reverse', }); - await handlerMap - .get(nodesEvents.EventNodeConnectionManagerConnectionForward) - ?.handler( - new nodesEvents.EventNodeConnectionManagerConnectionForward({ - detail: eventDetail, - }), - ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ ...auditEventData, type: 'forward', @@ -185,11 +189,229 @@ describe('auditEventGet', () => { }); test('gets discovery events', async () => { // Set up some events + await handleEvent( + new discoveryEvents.EventDiscoveryVertexQueued({ + detail: { + vertex: 'vertex1' as GestaltIdEncoded, + }, + }), + ); + await handleEvent( + new discoveryEvents.EventDiscoveryVertexQueued({ + detail: { + vertex: 'vertex2' as GestaltIdEncoded, + parent: 'vertex1' as GestaltIdEncoded, + }, + }), + ); + await handleEvent( + new discoveryEvents.EventDiscoveryVertexQueued({ + detail: { + vertex: 'vertex3' as GestaltIdEncoded, + parent: 'vertex1' as GestaltIdEncoded, + }, + }), + ); + await handleEvent( + new discoveryEvents.EventDiscoveryVertexProcessed({ + detail: { + vertex: 'vertex1' as GestaltIdEncoded, + }, + }), + ); + await handleEvent( + new discoveryEvents.EventDiscoveryVertexFailed({ + detail: { + vertex: 'vertex2' as GestaltIdEncoded, + parent: 'vertex1' as GestaltIdEncoded, + code: 255, + message: 'some message', + }, + }), + ); + await handleEvent( + new discoveryEvents.EventDiscoveryVertexCancelled({ + detail: { + vertex: 'vertex3' as GestaltIdEncoded, + parent: 'vertex1' as GestaltIdEncoded, + }, + }), + ); + + const readableStream = await rpcClient.methods.auditEventsGet({ + path: ['discovery', 'vertex'], + }); + const results: Array = []; + for await (const result of readableStream) { + results.push(result); + } + expect(results).toHaveLength(6); + }); +}); + +describe('auditEventMultiPathGet', () => { + const logger = new Logger('auditEventsGet test', LogLevel.WARN, [ + new StreamHandler( + formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`, + ), + ]); + const password = 'password'; + const localhost = '127.0.0.1'; + let audit: Audit; + let dataDir: string; + let db: DB; + let keyRing: KeyRing; + let clientService: ClientService; + let webSocketClient: WebSocketClient; + let rpcClient: OverrideRPClientType< + RPCClient<{ + auditEventsMultiPathGet: typeof auditEventsMultiPathGet; + }> + >; + let tlsConfig: TLSConfig; + let nodeConnectionManager: NodeConnectionManager; // Event target pretending to be discovery + let discovery: Discovery; // Event target pretending to be discovery + + const handleEvent = async (evt) => { // @ts-ignore: kidnap protected handlerMap so we can send events in the foreground const handlerMap = audit.eventHandlerMap; - const handleEvent = async (evt) => { - await handlerMap.get(evt.constructor)?.handler(evt); + await handlerMap.get(evt.constructor)?.handler(evt); + }; + + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), 'polykey-test-'), + ); + const keysPath = path.join(dataDir, 'keys'); + const dbPath = path.join(dataDir, 'db'); + db = await DB.createDB({ + dbPath, + logger, + }); + keyRing = await KeyRing.createKeyRing({ + password, + keysPath, + passwordOpsLimit: keysUtils.passwordOpsLimits.min, + passwordMemLimit: keysUtils.passwordMemLimits.min, + strictMemoryLock: false, + logger, + }); + tlsConfig = await testsUtils.createTLSConfig(keyRing.keyPair); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + nodeConnectionManager = new EventTarget() as any; + discovery = new EventTarget() as any; + audit = await Audit.createAudit({ + db, + nodeConnectionManager, + discovery, + logger: logger.getChild(Audit.name), + }); + clientService = new ClientService({ + tlsConfig, + logger: logger.getChild(ClientService.name), + }); + await clientService.start({ + manifest: { + auditEventsGet: new AuditEventsGet({ + audit, + }), + auditEventsMultiPathGet: new AuditEventsMultiPathGet({ + audit, + }), + }, + host: localhost, + }); + webSocketClient = await WebSocketClient.createWebSocketClient({ + config: { + verifyPeer: false, + }, + host: localhost, + logger: logger.getChild(WebSocketClient.name), + port: clientService.port, + }); + rpcClient = new RPCClient({ + manifest: { + auditEventsMultiPathGet, + }, + streamFactory: () => webSocketClient.connection.newStream(), + toError: networkUtils.toError, + logger: logger.getChild(RPCClient.name), + }) as any; + }); + afterEach(async () => { + await clientService.stop({ force: true }); + await webSocketClient.destroy({ force: true }); + await keyRing.stop(); + await audit.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + + test('cancels', async () => { + let callerInterface = await rpcClient.methods.auditEventsMultiPathGet({ + paths: [], + }); + let reader = callerInterface.getReader(); + await reader.cancel(); + await expect(reader.closed).toResolve(); + callerInterface = await rpcClient.methods.auditEventsMultiPathGet({ + paths: [], + awaitFutureEvents: true, + }); + reader = callerInterface.getReader(); + await reader.cancel(); + await expect(reader.closed).toResolve(); + }); + test('gets connection events', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + const auditEventData = { + ...eventDetail, + remoteNodeId: nodesUtils.encodeNodeId(eventDetail.remoteNodeId), }; + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: eventDetail, + }), + ); + let callerInterface: any = await rpcClient.methods.auditEventsMultiPathGet({ + paths: ['node.connection.reverse'], + }); + let reader = callerInterface.getReader(); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'reverse', + }); + callerInterface = await rpcClient.methods.auditEventsMultiPathGet({ + paths: ['node.connection'], + awaitFutureEvents: true, + }); + reader = callerInterface.getReader(); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'reverse', + }); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: eventDetail, + }), + ); + await expect(reader.read().then((e) => e.value!.data)).resolves.toEqual({ + ...auditEventData, + type: 'forward', + }); + }); + test('gets discovery events', async () => { + // Set up some events await handleEvent( new discoveryEvents.EventDiscoveryVertexQueued({ detail: { @@ -239,8 +461,8 @@ describe('auditEventGet', () => { }), ); - const readableStream = await rpcClient.methods.auditEventsGet({ - path: ['discovery', 'vertex'], + const readableStream = await rpcClient.methods.auditEventsMultiPathGet({ + paths: ['discovery.vertex'], }); const results: Array = []; for await (const result of readableStream) { @@ -248,6 +470,120 @@ describe('auditEventGet', () => { } expect(results).toHaveLength(6); }); + test('can get multiple paths in ascending order', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: { + ...eventDetail, + remotePort: 1 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: { + ...eventDetail, + remotePort: 2 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: { + ...eventDetail, + remotePort: 3 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: { + ...eventDetail, + remotePort: 4 as Port, + }, + }), + ); + const callerInterface: any = + await rpcClient.methods.auditEventsMultiPathGet({ + paths: ['node.connection', 'node.connection.forward'], + order: 'asc', + }); + const order: Array = []; + const pathSet: Set = new Set(); + for await (const result of callerInterface) { + order.push(result.data.remotePort); + pathSet.add(result.path.join('.')); + } + expect(order).toMatchObject([1, 2, 3, 4]); + expect([...pathSet]).toIncludeAllMembers([ + 'node.connection.reverse', + 'node.connection.forward', + ]); + expect(pathSet.size).toBe(2); + }); + test('can get multiple paths in descending order', async () => { + const nodeId = testNodesUtils.generateRandomNodeId(); + const eventDetail: ConnectionData = { + remoteHost: '::' as Host, + remoteNodeId: nodeId, + remotePort: 0 as Port, + }; + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: { + ...eventDetail, + remotePort: 1 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: { + ...eventDetail, + remotePort: 2 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionReverse({ + detail: { + ...eventDetail, + remotePort: 3 as Port, + }, + }), + ); + await handleEvent( + new nodesEvents.EventNodeConnectionManagerConnectionForward({ + detail: { + ...eventDetail, + remotePort: 4 as Port, + }, + }), + ); + const callerInterface: any = + await rpcClient.methods.auditEventsMultiPathGet({ + paths: ['node.connection', 'node.connection.forward'], + order: 'desc', + }); + const order: Array = []; + const pathSet: Set = new Set(); + for await (const result of callerInterface) { + order.push(result.data.remotePort); + pathSet.add(result.path.join('.')); + } + expect(order).toMatchObject([4, 3, 2, 1]); + expect([...pathSet]).toIncludeAllMembers([ + 'node.connection.reverse', + 'node.connection.forward', + ]); + expect(pathSet.size).toBe(2); + }); }); describe('auditMetricGet', () => { diff --git a/tests/git/http.test.ts b/tests/git/http.test.ts index a5d816c805..0bb81b6965 100644 --- a/tests/git/http.test.ts +++ b/tests/git/http.test.ts @@ -154,7 +154,7 @@ describe('Git Http', () => { 'parsePackRequest handles random data', async (data) => { const bufferData = Buffer.from(data); - fc.pre(!/^[0-9a-f]{4}$/.test(bufferData.subarray(0, 4).toString())) + fc.pre(!/^[0-9a-f]{4}$/.test(bufferData.subarray(0, 4).toString())); await expect(gitHttp.parsePackRequest([bufferData])).rejects.toThrow( validationErrors.ErrorParse, ); diff --git a/tests/git/utils.test.ts b/tests/git/utils.test.ts index d604c27b2a..69b960bfd0 100644 --- a/tests/git/utils.test.ts +++ b/tests/git/utils.test.ts @@ -241,7 +241,7 @@ describe('Git utils', () => { 'parseRequestLine handles bad data', async (randomData) => { const bufferData = Buffer.from(randomData); - fc.pre(!/^[0-9a-f]{4}$/.test(bufferData.subarray(0, 4).toString())) + fc.pre(!/^[0-9a-f]{4}$/.test(bufferData.subarray(0, 4).toString())); expect(() => gitUtils.parseRequestLine(bufferData)).toThrow( validationErrors.ErrorParse, );