diff --git a/src/audit/CommandAudit.ts b/src/audit/CommandAudit.ts new file mode 100644 index 00000000..3a996888 --- /dev/null +++ b/src/audit/CommandAudit.ts @@ -0,0 +1,13 @@ +import CommandDiscovery from './CommandDiscovery'; +import CommandPolykey from '../CommandPolykey'; + +class CommandIdentities extends CommandPolykey { + constructor(...args: ConstructorParameters) { + super(...args); + this.name('audit'); + this.description('Auditing'); + this.addCommand(new CommandDiscovery(...args)); + } +} + +export default CommandIdentities; diff --git a/src/audit/CommandDiscovery.ts b/src/audit/CommandDiscovery.ts new file mode 100644 index 00000000..f3d3c4bf --- /dev/null +++ b/src/audit/CommandDiscovery.ts @@ -0,0 +1,117 @@ +import type PolykeyClient from 'polykey/dist/PolykeyClient'; +import CommandPolykey from '../CommandPolykey'; +import * as binOptions from '../utils/options'; +import * as binUtils from '../utils'; +import * as binProcessors from '../utils/processors'; + +class CommandDiscover extends CommandPolykey { + constructor(...args: ConstructorParameters) { + super(...args); + this.name('discovery'); + this.description('Displays discovery event history'); + this.addOption(binOptions.nodeId); + this.addOption(binOptions.clientHost); + this.addOption(binOptions.clientPort); + this.addOption(binOptions.seekStart); + this.addOption(binOptions.seekEnd); + this.addOption(binOptions.futureEvents); + this.addOption(binOptions.discoveryEvents); + this.addOption(binOptions.limit); + this.addOption(binOptions.order); + this.action(async (options) => { + const { default: PolykeyClient } = await import( + 'polykey/dist/PolykeyClient' + ); + const clientOptions = await binProcessors.processClientOptions( + options.nodePath, + options.nodeId, + options.clientHost, + options.clientPort, + this.fs, + this.logger.getChild(binProcessors.processClientOptions.name), + ); + const auth = await binProcessors.processAuthentication( + options.passwordFile, + this.fs, + ); + let pkClient: PolykeyClient; + this.exitHandlers.handlers.push(async () => { + if (pkClient != null) await pkClient.stop(); + }); + try { + pkClient = await PolykeyClient.createPolykeyClient({ + nodeId: clientOptions.nodeId, + host: clientOptions.clientHost, + port: clientOptions.clientPort, + options: { + nodePath: options.nodePath, + }, + logger: this.logger.getChild(PolykeyClient.name), + }); + // Creating an infinite timer to hold the process open + const holdOpenTimer = setTimeout(() => {}, 2 ** 30); + // We set up the readable stream watching the discovery events here + await binUtils + .retryAuthentication(async (auth) => { + const seek: number = options.seekStart; + const seekEnd: number | undefined = options.seekEnd; + const order: 'asc' | 'desc' = options.order; + const limit: number | undefined = options.limit; + const events: Array< + 'queued' | 'processed' | 'cancelled' | 'failed' + > = options.discoveryEvents; + const awaitFutureEvents = options.futureEvents; + const readableStream = + await pkClient.rpcClient.methods.auditEventsGet({ + awaitFutureEvents, + path: ['discovery', 'vertex'], + seek, + seekEnd, + order, + limit, + metadata: auth, + }); + // Tracks vertices that are relevant to our current search + for await (const result of readableStream) { + const event = result.path[2]; + const { vertex } = result.data; + // Don't emit events we're not filtering for + if (events != null && !(>events).includes(event)) { + continue; + } + const data = { + event, + vertex, + }; + if (options.format === 'json') { + process.stdout.write( + binUtils.outputFormatter({ + type: 'json', + data, + }), + ); + } else { + process.stdout.write( + binUtils.outputFormatter({ + type: 'list', + data: [ + `${data.event}${' '.repeat(15 - data.event.length)}${ + data.vertex + }`, + ], + }), + ); + } + } + }, auth) + .finally(() => { + clearTimeout(holdOpenTimer); + }); + } finally { + if (pkClient! != null) await pkClient.stop(); + } + }); + } +} + +export default CommandDiscover; diff --git a/src/audit/index.ts b/src/audit/index.ts new file mode 100644 index 00000000..b08d99a0 --- /dev/null +++ b/src/audit/index.ts @@ -0,0 +1 @@ +export { default } from './CommandAudit'; diff --git a/src/polykey.ts b/src/polykey.ts index d1c948bb..67b90de4 100755 --- a/src/polykey.ts +++ b/src/polykey.ts @@ -143,6 +143,7 @@ async function polykeyMain(argv: Array): Promise { const { default: config } = await import('polykey/dist/config'); const { default: CommandBootstrap } = await import('./bootstrap'); const { default: CommandAgent } = await import('./agent'); + const { default: CommandAudit } = await import('./audit'); const { default: CommandVaults } = await import('./vaults'); const { default: CommandSecrets } = await import('./secrets'); const { default: CommandKeys } = await import('./keys'); @@ -164,6 +165,7 @@ async function polykeyMain(argv: Array): Promise { rootCommand.description('Polykey CLI'); rootCommand.addCommand(new CommandBootstrap({ exitHandlers, fs })); rootCommand.addCommand(new CommandAgent({ exitHandlers, fs })); + rootCommand.addCommand(new CommandAudit({ exitHandlers, fs })); rootCommand.addCommand(new CommandNodes({ exitHandlers, fs })); rootCommand.addCommand(new CommandSecrets({ exitHandlers, fs })); rootCommand.addCommand(new CommandKeys({ exitHandlers, fs })); diff --git a/src/utils/options.ts b/src/utils/options.ts index 78906cc4..8a32c209 100644 --- a/src/utils/options.ts +++ b/src/utils/options.ts @@ -238,6 +238,53 @@ const discoveryMonitor = new commander.Option( 'Enabling monitoring will cause discover to output discovery events as they happen and will exit once all children are processed', ).default(false); +const parseDate = (value: string): number => { + if (value.toLowerCase() === 'now') return Date.now(); + const date = Date.parse(value); + if (isNaN(date)) throw Error('Invalid data'); + return date; +}; + +const seekStart = new commander.Option( + '--seek-start [seekStart]', + `time to start seeking from`, +) + .argParser(parseDate) + .default(0); + +const seekEnd = new commander.Option( + '--seek-end [seekEnd]', + `time to seek until`, +) + .argParser(parseDate) + .default(undefined); + +const futureEvents = new commander.Option( + '--future-events', + 'If enabled, future events will be outputted as they happen', +).default(false); + +const discoveryEvents = new commander.Option( + '--discovery-events [discoveryEvents...]', + 'Filter for specified events', +) + .choices(['queued', 'processed', 'cancelled', 'failed']) + .default(undefined); + +const limit = new commander.Option( + '--limit [limit]', + 'Limit the number of emitted events', +) + .argParser(parseInt) + .default(undefined); + +const order = new commander.Option( + '--order [order]', + 'Filter for specified events', +) + .choices(['asc', 'desc']) + .default('asc'); + export { nodePath, format, @@ -272,4 +319,10 @@ export { envInvalid, envDuplicate, discoveryMonitor, + seekStart, + seekEnd, + futureEvents, + discoveryEvents, + limit, + order, }; diff --git a/tests/audit/discovery.test.ts b/tests/audit/discovery.test.ts new file mode 100644 index 00000000..a4b003a3 --- /dev/null +++ b/tests/audit/discovery.test.ts @@ -0,0 +1,307 @@ +import type { GestaltIdEncoded } from 'polykey/dist/gestalts/types'; +import path from 'path'; +import fs from 'fs'; +import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; +import PolykeyAgent from 'polykey/dist/PolykeyAgent'; +import * as identitiesUtils from 'polykey/dist/identities/utils'; +import * as keysUtils from 'polykey/dist/keys/utils'; +import * as discoveryEvents from 'polykey/dist/discovery/events'; +import { sleep } from 'polykey/dist/utils'; +import * as testUtils from '../utils'; + +// @ts-ignore: stub out method +identitiesUtils.browser = () => {}; + +describe('discover/get', () => { + const logger = new Logger('discover/get test', LogLevel.WARN, [ + new StreamHandler(), + ]); + const password = 'password'; + let dataDir: string; + let nodePath: string; + let pkAgent: PolykeyAgent; + let handleEvent: (evt) => Promise; + let processVertex: ( + parent: string | undefined, + children: Array, + ) => Promise; + beforeEach(async () => { + dataDir = await fs.promises.mkdtemp( + path.join(globalThis.tmpDir, 'polykey-test-'), + ); + // Set up the remote gestalt state here + // Setting up remote nodes + nodePath = path.join(dataDir, 'polykey'); + // Cannot use global shared agent since we need to register a provider + pkAgent = await PolykeyAgent.createPolykeyAgent({ + password, + options: { + nodePath, + agentServiceHost: '127.0.0.1', + clientServiceHost: '127.0.0.1', + keys: { + passwordOpsLimit: keysUtils.passwordOpsLimits.min, + passwordMemLimit: keysUtils.passwordMemLimits.min, + strictMemoryLock: false, + }, + }, + logger, + }); + + const audit = pkAgent.audit; + // @ts-ignore: kidnap protected + const handlerMap = audit.eventHandlerMap; + handleEvent = async (evt) => { + await handlerMap.get(evt.constructor)!.handler(evt); + }; + processVertex = async ( + parent: string | undefined, + children: Array, + ) => { + for (const child of children) { + await handleEvent( + new discoveryEvents.EventDiscoveryVertexQueued({ + detail: { + vertex: child as GestaltIdEncoded, + parent: parent as GestaltIdEncoded, + }, + }), + ); + } + if (parent != null) { + await handleEvent( + new discoveryEvents.EventDiscoveryVertexProcessed({ + detail: { + vertex: parent as GestaltIdEncoded, + }, + }), + ); + } + }; + }); + afterEach(async () => { + await pkAgent.stop(); + await fs.promises.rm(dataDir, { + force: true, + recursive: true, + }); + }); + test('should get all discovery events', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex('node-A', ['node-B', 'node-C']); + await processVertex('node-B', ['node-D']); + await processVertex('node-C', []); + await processVertex('node-D', []); + // Checking response + const discoverResponse = await testUtils.pkExec(['audit', 'discovery'], { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }); + expect(discoverResponse.stdout).toIncludeMultiple([ + 'queued', + 'processed', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse.exitCode).toBe(0); + }); + test('should get specific discovery events', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex('node-A', ['node-B', 'node-C']); + await processVertex('node-B', ['node-D']); + await processVertex('node-C', []); + await processVertex('node-D', []); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', 'discovery', '--discovery-events', 'queued'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'queued', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse1.stdout).not.toInclude('processed'); + expect(discoverResponse1.exitCode).toBe(0); + + const discoverResponse2 = await testUtils.pkExec( + ['audit', 'discovery', '--discovery-events', 'processed'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse2.stdout).toIncludeMultiple([ + 'processed', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse2.stdout).not.toInclude('queued'); + expect(discoverResponse2.exitCode).toBe(0); + + const discoverResponse3 = await testUtils.pkExec( + ['audit', 'discovery', '--discovery-events', 'processed', 'queued'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse3.stdout).toIncludeMultiple([ + 'processed', + 'queued', + 'node-A', + 'node-B', + 'node-C', + 'node-D', + ]); + expect(discoverResponse3.exitCode).toBe(0); + }); + test('should seek from seekStart', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(50); + const date = new Date(); + await sleep(50); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', 'discovery', '--seek-start', date.toISOString()], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).not.toInclude('node-A'); + expect(discoverResponse1.stdout).not.toInclude('node-AA'); + expect(discoverResponse1.stdout).not.toInclude('node-B'); + expect(discoverResponse1.stdout).not.toInclude('node-BA'); + expect(discoverResponse1.stdout).not.toInclude('node-C'); + expect(discoverResponse1.stdout).not.toInclude('node-CA'); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'queued', + 'processed', + 'node-E', + 'node-EA', + 'node-F', + 'node-FA', + 'node-G', + 'node-GA', + ]); + expect(discoverResponse1.exitCode).toBe(0); + }); + test('should seek until seekEnd', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(50); + const date = new Date(); + await sleep(50); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse1 = await testUtils.pkExec( + ['audit', 'discovery', '--seek-end', date.toISOString()], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse1.stdout).toIncludeMultiple([ + 'queued', + 'processed', + 'node-A', + 'node-AA', + 'node-B', + 'node-BA', + 'node-C', + 'node-CA', + ]); + expect(discoverResponse1.stdout).not.toInclude('node-E'); + expect(discoverResponse1.stdout).not.toInclude('node-EA'); + expect(discoverResponse1.stdout).not.toInclude('node-F'); + expect(discoverResponse1.stdout).not.toInclude('node-FA'); + expect(discoverResponse1.stdout).not.toInclude('node-G'); + expect(discoverResponse1.stdout).not.toInclude('node-GA'); + expect(discoverResponse1.exitCode).toBe(0); + }); + test('should seek until limit', async () => { + // Start of with mocking some existing discovery events + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + await processVertex(undefined, ['node-A']); + // Checking response + const discoverResponse = await testUtils.pkExec( + ['audit', 'discovery', '--limit', '2'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + expect(discoverResponse.stdout).toIncludeRepeated('queued', 2); + expect(discoverResponse.stdout).toIncludeRepeated('node-A', 2); + expect(discoverResponse.exitCode).toBe(0); + }); + test('should await future events', async () => { + // Start of with mocking some existing discovery events + await processVertex('node-A', ['node-AA']); + await processVertex('node-B', ['node-BA']); + await processVertex('node-C', ['node-CA']); + await sleep(100); + const discoverResponseP = testUtils.pkExec( + ['audit', 'discovery', '--future-events', '--limit', '12'], + { + env: { + PK_NODE_PATH: nodePath, + PK_PASSWORD: password, + }, + cwd: dataDir, + }, + ); + await sleep(100); + await processVertex('node-E', ['node-EA']); + await processVertex('node-F', ['node-FA']); + await processVertex('node-G', ['node-GA']); + // Checking response + const discoverResponse = await discoverResponseP; + expect(discoverResponse.exitCode).toBe(0); + }); +});