From 0947cc639300555e51fdbe58fc48bceb6fb20965 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Fri, 26 Apr 2024 12:40:45 +1000 Subject: [PATCH] feat: adding a generalized `audit` domain command for inspecting audit events [ci skip] --- src/audit/CommandAudit.ts | 106 +++++++++++ src/audit/index.ts | 1 + src/identities/CommandDiscover.ts | 19 +- src/polykey.ts | 2 + src/utils/options.ts | 63 ++++++ tests/audit/audit.test.ts | 307 ++++++++++++++++++++++++++++++ 6 files changed, 495 insertions(+), 3 deletions(-) create mode 100644 src/audit/CommandAudit.ts create mode 100644 src/audit/index.ts create mode 100644 tests/audit/audit.test.ts diff --git a/src/audit/CommandAudit.ts b/src/audit/CommandAudit.ts new file mode 100644 index 00000000..ccfc39f8 --- /dev/null +++ b/src/audit/CommandAudit.ts @@ -0,0 +1,106 @@ +import type PolykeyClient from 'polykey/dist/PolykeyClient'; +import * as binOptions from '../utils/options'; +import * as binProcessors from '../utils/processors'; +import * as binUtils from '../utils'; +import CommandPolykey from '../CommandPolykey'; + +class CommandIdentities extends CommandPolykey { + constructor(...args: ConstructorParameters) { + super(...args); + this.name('audit'); + this.description('Displays audit event history'); + this.addOption(binOptions.nodeId); + this.addOption(binOptions.clientHost); + this.addOption(binOptions.clientPort); + this.addOption(binOptions.seekStart); + this.addOption(binOptions.seekEnd); + this.addOption(binOptions.follow); + this.addOption(binOptions.events); + this.addOption(binOptions.limit); + this.addOption(binOptions.order); + this.action(async (options) => { + const { default: PolykeyClient } = await import( + 'polykey/dist/PolykeyClient' + ); + const auditUtils = await import('polykey/dist/audit/utils'); + const clientOptions = await binProcessors.processClientOptions( + options.nodePath, + options.nodeId, + options.clientHost, + options.clientPort, + this.fs, + this.logger.getChild(binProcessors.processClientOptions.name), + ); + const auth = await binProcessors.processAuthentication( + options.passwordFile, + this.fs, + ); + let pkClient: PolykeyClient; + this.exitHandlers.handlers.push(async () => { + if (pkClient != null) await pkClient.stop(); + }); + try { + pkClient = await PolykeyClient.createPolykeyClient({ + nodeId: clientOptions.nodeId, + host: clientOptions.clientHost, + port: clientOptions.clientPort, + options: { + nodePath: options.nodePath, + }, + logger: this.logger.getChild(PolykeyClient.name), + }); + // Creating an infinite timer to hold the process open + const holdOpenTimer = setTimeout(() => {}, 2 ** 30); + // We set up the readable stream watching the discovery events here + await binUtils + .retryAuthentication(async (auth) => { + const seek: number = options.seekStart; + const seekEnd: number | undefined = options.seekEnd; + const order: 'asc' | 'desc' = options.order; + const limit: number | undefined = options.limit; + const awaitFutureEvents = options.follow; + const readableStream = + await pkClient.rpcClient.methods.auditEventsGet({ + awaitFutureEvents, + paths: auditUtils.filterSubPaths(options.events ?? [[]]), + seek, + seekEnd, + order, + limit, + metadata: auth, + }); + // Tracks vertices that are relevant to our current search + for await (const result of readableStream) { + const sanitizedResult = { + id: result.id, + path: result.path.join('.'), + data: result.data, + }; + if (options.format === 'json') { + process.stdout.write( + binUtils.outputFormatter({ + type: 'json', + data: sanitizedResult, + }), + ); + } else { + process.stdout.write( + binUtils.outputFormatter({ + type: 'dict', + data: { '>': sanitizedResult }, + }), + ); + } + } + }, auth) + .finally(() => { + clearTimeout(holdOpenTimer); + }); + } finally { + if (pkClient! != null) await pkClient.stop(); + } + }); + } +} + +export default CommandIdentities; diff --git a/src/audit/index.ts b/src/audit/index.ts new file mode 100644 index 00000000..b08d99a0 --- /dev/null +++ b/src/audit/index.ts @@ -0,0 +1 @@ +export { default } from './CommandAudit'; diff --git a/src/identities/CommandDiscover.ts b/src/identities/CommandDiscover.ts index d346c2db..071cc397 100644 --- a/src/identities/CommandDiscover.ts +++ b/src/identities/CommandDiscover.ts @@ -1,5 +1,9 @@ import type PolykeyClient from 'polykey/dist/PolykeyClient'; import type { GestaltId } from 'polykey/dist/gestalts/types'; +import type { + AuditEventDiscoveryVertex, + AuditEventToAuditEventSerialized, +} from 'polykey/dist/audit/types'; import CommandPolykey from '../CommandPolykey'; import * as binOptions from '../utils/options'; import * as binUtils from '../utils'; @@ -63,7 +67,7 @@ class CommandDiscover extends CommandPolykey { const readableStream = await pkClient.rpcClient.methods.auditEventsGet({ awaitFutureEvents: true, - path: ['discovery', 'vertex'], + paths: [['discovery', 'vertex']], seek: Date.now(), metadata: auth, }); @@ -74,8 +78,17 @@ class CommandDiscover extends CommandPolykey { // Adding the initial vertex relevantSet.add(gestaltUtils.encodeGestaltId(gestaltId)); for await (const result of readableStream) { - const event = result.path[2]; - const { vertex, parent } = result.data; + if ( + result.path[0] !== 'discovery' || + result.path[1] !== 'vertex' + ) { + utils.never('Should be a discovery vertex event'); + } + // We're only requesting discovery vertex events, so we need to re-cast the type here + const resultTyped = + result as AuditEventToAuditEventSerialized; + const event = resultTyped.path[2]; + const { vertex, parent } = resultTyped.data; // Skip if the vertex and parent are not relevant if ( !relevantSet.has(vertex) && diff --git a/src/polykey.ts b/src/polykey.ts index d1c948bb..67b90de4 100755 --- a/src/polykey.ts +++ b/src/polykey.ts @@ -143,6 +143,7 @@ async function polykeyMain(argv: Array): Promise { const { default: config } = await import('polykey/dist/config'); const { default: CommandBootstrap } = await import('./bootstrap'); const { default: CommandAgent } = await import('./agent'); + const { default: CommandAudit } = await import('./audit'); const { default: CommandVaults } = await import('./vaults'); const { default: CommandSecrets } = await import('./secrets'); const { default: CommandKeys } = await import('./keys'); @@ -164,6 +165,7 @@ async function polykeyMain(argv: Array): Promise { rootCommand.description('Polykey CLI'); rootCommand.addCommand(new CommandBootstrap({ exitHandlers, fs })); rootCommand.addCommand(new CommandAgent({ exitHandlers, fs })); + rootCommand.addCommand(new CommandAudit({ exitHandlers, fs })); rootCommand.addCommand(new CommandNodes({ exitHandlers, fs })); rootCommand.addCommand(new CommandSecrets({ exitHandlers, fs })); rootCommand.addCommand(new CommandKeys({ exitHandlers, fs })); diff --git a/src/utils/options.ts b/src/utils/options.ts index 78906cc4..b178f270 100644 --- a/src/utils/options.ts +++ b/src/utils/options.ts @@ -238,6 +238,63 @@ const discoveryMonitor = new commander.Option( 'Enabling monitoring will cause discover to output discovery events as they happen and will exit once all children are processed', ).default(false); +const parseDate = (value: string): number => { + if (value.toLowerCase() === 'now') return Date.now(); + const date = Date.parse(value); + if (isNaN(date)) throw Error('Invalid data'); + return date; +}; + +const seekStart = new commander.Option( + '--seek-start [seekStart]', + `time to start seeking from`, +) + .argParser(parseDate) + .default(0); + +const seekEnd = new commander.Option( + '--seek-end [seekEnd]', + `time to seek until`, +) + .argParser(parseDate) + .default(undefined); + +const follow = new commander.Option( + '--follow', + 'If enabled, future events will be outputted as they happen', +).default(false); + +const events = new commander.Option( + '--events [events...]', + 'Filter for specified event paths', +) + .argParser( + ( + value: string, + previous: Array> | undefined, + ): Array> => { + const parsedPath = value.split('.'); + const out = previous ?? []; + out.push(parsedPath); + return out; + }, + ) + .default(undefined); + +const limit = new commander.Option( + '--limit [limit]', + 'Limit the number of emitted events', +) + .argParser(parseInt) + .default(undefined); + +const order = new commander.Option( + '--order [order]', + 'Filter for specified events', +) + .choices(['asc', 'desc']) + .default('asc'); + export { nodePath, format, @@ -272,4 +329,10 @@ export { envInvalid, envDuplicate, discoveryMonitor, + seekStart, + seekEnd, + follow, + events, + limit, + order, }; diff --git a/tests/audit/audit.test.ts b/tests/audit/audit.test.ts new file mode 100644 index 00000000..9b8429e8 --- /dev/null +++ b/tests/audit/audit.test.ts @@ -0,0 +1,307 @@ +import type { GestaltIdEncoded } from 'polykey/dist/gestalts/types'; +import path from 'path'; +import fs from 'fs'; +import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import PolykeyAgent from 'polykey/dist/PolykeyAgent'; +import * as identitiesUtils from 'polykey/dist/identities/utils'; +import * as keysUtils from 'polykey/dist/keys/utils'; +import * as discoveryEvents from 'polykey/dist/discovery/events'; +import { sleep } from 'polykey/dist/utils'; +import * as testUtils from '../utils'; + +// @ts-ignore: stub out method +identitiesUtils.browser = () => {}; + +describe('audit', () => { + const logger = new Logger('audit test', LogLevel.WARN, [new StreamHandler()]); + const password = 'password'; + let dataDir: string; + let nodePath: string; + let pkAgent: PolykeyAgent; + let handleEvent: (evt) => Promise; + let processVertex: ( + parent: string | undefined, + children: Array, + ) => Promise; + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(globalThis.tmpDir, 'polykey-test-'), + ); + // Set up the remote gestalt state here + // Setting up remote nodes + nodePath = path.join(dataDir, 'polykey'); + // Cannot use global shared agent since we need to register a provider + pkAgent = await PolykeyAgent.createPolykeyAgent({ + password, + options: { + nodePath, + agentServiceHost: '127.0.0.1', + clientServiceHost: '127.0.0.1', + keys: { + passwordOpsLimit: keysUtils.passwordOpsLimits.min, + passwordMemLimit: keysUtils.passwordMemLimits.min, + strictMemoryLock: false, + }, + }, + logger, + }); + + const audit = pkAgent.audit; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + handleEvent = async (evt) => { + await handlerMap.get(evt.constructor)!.handler(evt); + }; + processVertex = async ( + parent: string | undefined, + children: Array, + ) => { + for (const child of children) { + await handleEvent( + new discoveryEvents.EventDiscoveryVertexQueued({ + detail: { + vertex: child as GestaltIdEncoded, + parent: parent as GestaltIdEncoded, + }, + }), + ); + } + if (parent != null) { + await handleEvent( + new discoveryEvents.EventDiscoveryVertexProcessed({ + detail: { + vertex: parent as GestaltIdEncoded, + }, + }), + ); + } + }; + }); + afterEach(async () => { + await pkAgent.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + test('should get all events', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex('node-A', ['node-B', 'node-C']); + await processVertex('node-B', ['node-D']); + await processVertex('node-C', []); + await processVertex('node-D', []); + // Checking response + const discoverResponse = await testUtils.pkExec(['audit'], { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }); + expect(discoverResponse.stdout).toIncludeMultiple([ + 'discovery.vertex.queued', + 'discovery.vertex.processed', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse.exitCode).toBe(0); + }); + test('should get specific events', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex('node-A', ['node-B', 'node-C']); + await processVertex('node-B', ['node-D']); + await processVertex('node-C', []); + await processVertex('node-D', []); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', '--events', 'discovery.vertex.queued'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'discovery.vertex.queued', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse1.stdout).not.toInclude('processed'); + expect(discoverResponse1.exitCode).toBe(0); + + const discoverResponse2 = await testUtils.pkExec( + ['audit', '--events', 'discovery.vertex.processed'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse2.stdout).toIncludeMultiple([ + 'discovery.vertex.processed', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse2.stdout).not.toInclude('discovery.vertex.queued'); + expect(discoverResponse2.exitCode).toBe(0); + + const discoverResponse3 = await testUtils.pkExec( + [ + 'audit', + '--events', + 'discovery.vertex.processed', + 'discovery.vertex.queued', + ], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse3.stdout).toIncludeMultiple([ + 'discovery.vertex.processed', + 'discovery.vertex.queued', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse3.exitCode).toBe(0); + }); + test('should seek from seekStart', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(50); + const date = new Date(); + await sleep(50); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', '--seek-start', date.toISOString()], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).not.toInclude('node-A'); + expect(discoverResponse1.stdout).not.toInclude('node-AA'); + expect(discoverResponse1.stdout).not.toInclude('node-B'); + expect(discoverResponse1.stdout).not.toInclude('node-BA'); + expect(discoverResponse1.stdout).not.toInclude('node-C'); + expect(discoverResponse1.stdout).not.toInclude('node-CA'); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'queued', + 'processed', + 'node-E', + 'node-EA', + 'node-F', + 'node-FA', + 'node-G', + 'node-GA', + ]); + expect(discoverResponse1.exitCode).toBe(0); + }); + test('should seek until seekEnd', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(50); + const date = new Date(); + await sleep(50); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', '--seek-end', date.toISOString()], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'queued', + 'processed', + 'node-A', + 'node-AA', + 'node-B', + 'node-BA', + 'node-C', + 'node-CA', + ]); + expect(discoverResponse1.stdout).not.toInclude('node-E'); + expect(discoverResponse1.stdout).not.toInclude('node-EA'); + expect(discoverResponse1.stdout).not.toInclude('node-F'); + expect(discoverResponse1.stdout).not.toInclude('node-FA'); + expect(discoverResponse1.stdout).not.toInclude('node-G'); + expect(discoverResponse1.stdout).not.toInclude('node-GA'); + expect(discoverResponse1.exitCode).toBe(0); + }); + test('should seek until limit', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + // Checking response + const discoverResponse = await testUtils.pkExec(['audit', '--limit', '2'], { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }); + expect(discoverResponse.stdout).toIncludeRepeated('queued', 2); + expect(discoverResponse.stdout).toIncludeRepeated('node-A', 2); + expect(discoverResponse.exitCode).toBe(0); + }); + test('should await future events', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(100); + const discoverResponseP = testUtils.pkExec( + ['audit', '--follow', '--limit', '12'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + await sleep(100); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse = await discoverResponseP; + expect(discoverResponse.exitCode).toBe(0); + }); +});