Skip to content

Commit

Permalink
feat: adding auditEventMultiPathGet handler to support getting audi…
Browse files Browse the repository at this point in the history
…t events while selecting multiple paths

[ci skip]
  • Loading branch information
tegefaulkes committed May 29, 2024
1 parent dc72c7c commit 4705bc9
Show file tree
Hide file tree
Showing 9 changed files with 704 additions and 24 deletions.
108 changes: 108 additions & 0 deletions src/audit/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
import type * as nodesEvents from '../nodes/events';
import type * as discoveryEvents from '../discovery/events';
import type { AuditEventId } from '../ids';
import type { TopicPath } from './types';
import { IdInternal } from '@matrixai/id';
import * as sortableIdUtils from '@matrixai/id/dist/IdSortable';
import * as nodesUtils from '../nodes/utils';
Expand Down Expand Up @@ -187,6 +188,34 @@ const topicPaths = [
discoveryCheckRediscoveryTopicPath,
] as const;

// @ts-ignore: recursive definition for defining a tree
type TopicPathTreeNode = Record<string, TopicPathTreeNode>;

function generateTopicPathTree() {
const tree: TopicPathTreeNode = {};
for (const topicPath of topicPaths) {
let node: TopicPathTreeNode = tree;
for (const topicPathElement of topicPath) {
if (node[topicPathElement] == null) node[topicPathElement] = {};
node = node[topicPathElement];
}
}
return tree;
}

const topicPathTree = generateTopicPathTree();

function isTopicPath(it: unknown): it is TopicPath {
if (!Array.isArray(it)) return false;
let node = topicPathTree;
for (const pathElement of it) {
if (typeof pathElement !== 'string') return false;
if (node[pathElement] == null) return false;
node = node[pathElement];
}
return true;
}

// Metrics

// Nodes
Expand All @@ -211,6 +240,81 @@ const metricPaths = [
nodeConnectionOutboundMetricPath,
] as const;

/**
* Will take an array of dot path sorted paths and return the minimal list common paths.
* So sub-paths will be filtered out if we already contain a parent path E.G. `a.b` will be removed if we also include `a`.
* Duplicate paths will be removed, so `a` will be removed if two `a`'s exist.
*/
function filterSubPaths(paths: Array<string>): Array<string> {
let previous: string = '';
return paths.sort().filter((value, index) => {
// Checking if the current value is included within the previous
if (index === 0 || !value.startsWith(previous)) {
previous = value;
return true;
}
return false;
}, {});
}

/**
* This takes N generators that yield data in a sorted order and combines their outputs in a fully sorted order.
* This will only work on pre-sorted outputs from the generator.
*/
async function* genSort<T>(
sortFn: (a: T, b: T) => number,
...gens: Array<AsyncGenerator<T, void, void>>
): AsyncGenerator<T, void, void> {
const heads: Array<{
value: T;
gen: AsyncGenerator<T, void, void>;
index: number;
}> = [];
// Seed the heads
let i = 0;
for (const gen of gens) {
const head = await gen.next();
if (!head.done) {
heads.push({
value: head.value,
gen,
index: i++,
});
}
}
if (heads.length === 0) return;

// Yield from heads until all iterators are done
let first = true;
let previous: T;
try {
while (true) {
// Sort them in order by the sortFn
heads.sort(({ value: a }, { value: b }) => sortFn(a, b));
// Yield the first in the order
const head = heads[0];
// Skip any duplicates
if (first || sortFn(previous!, head.value) !== 0) yield head.value;
first = false;
previous = head.value;
// Get the new head for that generator
const next = await head.gen.next();
// If the generator is done then we remove it from the heads, otherwise update the head value
if (next.done) {
heads.shift();
} else {
head.value = next.value;
}
// If the last head is done then we break
if (heads.length === 0) return;
}
} finally {
for (const { gen } of heads) {
await gen.return();
}
}
}

export {
extractFromSeek,
createAuditEventIdGenerator,
Expand All @@ -234,8 +338,12 @@ export {
fromEventDiscoveryCheckRediscovery,
nodeGraphTopicPath,
topicPaths,
topicPathTree,
isTopicPath,
nodeConnectionMetricPath,
nodeConnectionInboundMetricPath,
nodeConnectionOutboundMetricPath,
metricPaths,
filterSubPaths,
genSort,
};
12 changes: 12 additions & 0 deletions src/client/callers/auditEventsMultiPathGet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { HandlerTypes } from '@matrixai/rpc';
import type AuditEventsMultiPathGet from '../handlers/AuditEventsMultiPathGet';
import { ServerCaller } from '@matrixai/rpc';

type CallerTypes = HandlerTypes<AuditEventsMultiPathGet>;

const auditEventsMultiPathGet = new ServerCaller<
CallerTypes['input'],
CallerTypes['output']
>();

export default auditEventsMultiPathGet;
2 changes: 2 additions & 0 deletions src/client/callers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import agentStatus from './agentStatus';
import agentStop from './agentStop';
import agentUnlock from './agentUnlock';
import auditEventsGet from './auditEventsGet';
import auditEventsMultiPathGet from './auditEventsMultiPathGet';
import auditMetricGet from './auditMetricGet';
import gestaltsActionsGetByIdentity from './gestaltsActionsGetByIdentity';
import gestaltsActionsGetByNode from './gestaltsActionsGetByNode';
Expand Down Expand Up @@ -84,6 +85,7 @@ const clientManifest = {
agentStop,
agentUnlock,
auditEventsGet,
auditEventsMultiPathGet,
auditMetricGet,
gestaltsActionsGetByIdentity,
gestaltsActionsGetByNode,
Expand Down
117 changes: 117 additions & 0 deletions src/client/handlers/AuditEventsMultiPathGet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types';
import type {
AuditEvent,
AuditEventSerialized,
AuditEventToAuditEventSerialized,
TopicPath,
} from '../../audit/types';
import type { Audit } from '../../audit';
import type { AuditEventId, AuditEventIdEncoded } from '../../ids';
import { ServerHandler } from '@matrixai/rpc';
import * as auditUtils from '../../audit/utils';

class AuditEventsGet extends ServerHandler<
{
audit: Audit;
},
ClientRPCRequestParams<{
paths: Array<string>;
seek?: AuditEventIdEncoded | number;
seekEnd?: AuditEventIdEncoded | number;
order?: 'asc' | 'desc';
limit?: number;
awaitFutureEvents?: boolean;
}>,
ClientRPCResponseResult<AuditEventSerialized>
> {
public async *handle(
{
paths,
seek,
seekEnd,
order = 'asc',
limit,
awaitFutureEvents = false,
}: ClientRPCRequestParams<{
seek?: AuditEventIdEncoded | number;
seekEnd?: AuditEventIdEncoded | number;
order?: 'asc' | 'desc';
limit?: number;
awaitFutureEvents?: boolean;
}> & {
paths: Array<string>;
},
_cancel,
_meta,
ctx: ContextTimed,
): AsyncGenerator<
ClientRPCResponseResult<AuditEventToAuditEventSerialized<AuditEvent>>
> {
const { audit } = this.container;
const iterators: Array<AsyncGenerator<AuditEvent>> = [];
let seek_: AuditEventId | number | undefined;
if (seek != null) {
seek_ =
typeof seek === 'string' ? auditUtils.decodeAuditEventId(seek) : seek;
}
let seekEnd_: AuditEventId | number | undefined;
if (seekEnd != null) {
seekEnd_ =
typeof seekEnd === 'string'
? auditUtils.decodeAuditEventId(seekEnd)
: seekEnd;
}

// Convert the paths
const topicPaths: Array<TopicPath> = [];
for (const filteredPath of auditUtils.filterSubPaths(paths)) {
const topicPath = filteredPath.split('.');
if (auditUtils.isTopicPath(topicPath)) topicPaths.push(topicPath);
}

// If the call is descending chronologically, or does not want to await future events,
// it should not await future events.
for (const topicPath of topicPaths) {
if (!awaitFutureEvents) {
const iterator = audit.getAuditEvents(topicPath, {
seek: seek_,
seekEnd: seekEnd_,
order: order,
limit: limit,
});
iterators.push(iterator);
} else {
const iterator = audit.getAuditEventsLongRunning(topicPath, {
seek: seek_,
seekEnd: seekEnd_,
limit: limit,
});
iterators.push(iterator);
}
}

// We need to reverse the compare if we are descending in time
const orderSwitchMultiplier = awaitFutureEvents || order === 'asc' ? 1 : -1;
function sortFn(a: AuditEvent, b: AuditEvent) {
return Buffer.compare(a.id, b.id) * orderSwitchMultiplier;
}

const combinedIterator = auditUtils.genSort<AuditEvent>(
sortFn,
...iterators,
);
ctx.signal.addEventListener('abort', async () => {
await combinedIterator.return(ctx.signal.reason);
});
for await (const auditEvent of combinedIterator) {
yield {
id: auditUtils.encodeAuditEventId(auditEvent.id),
path: auditEvent.path,
data: auditEvent.data,
};
}
}
}

export default AuditEventsGet;
9 changes: 7 additions & 2 deletions src/client/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import AgentLockAll from './AgentLockAll';
import AgentStatus from './AgentStatus';
import AgentStop from './AgentStop';
import AgentUnlock from './AgentUnlock';
import AuditEventsGet from './AuditEventsGet';
import AuditEventsMultiPathGet from './AuditEventsMultiPathGet';
import AuditMetricGet from './AuditMetricGet';
import GestaltsActionsGetByIdentity from './GestaltsActionsGetByIdentity';
import GestaltsActionsGetByNode from './GestaltsActionsGetByNode';
import GestaltsActionsSetByIdentity from './GestaltsActionsSetByIdentity';
Expand Down Expand Up @@ -89,8 +92,6 @@ import VaultsSecretsNewDir from './VaultsSecretsNewDir';
import VaultsSecretsRename from './VaultsSecretsRename';
import VaultsSecretsStat from './VaultsSecretsStat';
import VaultsVersion from './VaultsVersion';
import AuditEventsGet from './AuditEventsGet';
import AuditMetricGet from './AuditMetricGet';

/**
* Server manifest factory.
Expand Down Expand Up @@ -120,6 +121,7 @@ const serverManifest = (container: {
agentStop: new AgentStop(container),
agentUnlock: new AgentUnlock(container),
auditEventsGet: new AuditEventsGet(container),
auditEventsMultiPathGet: new AuditEventsMultiPathGet(container),
auditMetricGet: new AuditMetricGet(container),
gestaltsActionsGetByIdentity: new GestaltsActionsGetByIdentity(container),
gestaltsActionsGetByNode: new GestaltsActionsGetByNode(container),
Expand Down Expand Up @@ -205,6 +207,9 @@ export {
AgentStatus,
AgentStop,
AgentUnlock,
AuditEventsGet,
AuditEventsMultiPathGet,
AuditMetricGet,
GestaltsActionsGetByIdentity,
GestaltsActionsGetByNode,
GestaltsActionsSetByIdentity,
Expand Down
Loading

0 comments on commit 4705bc9

Please sign in to comment.