Skip to content

Commit

Permalink
Merge pull request #730 from MatrixAI/feature-audit-multipath
Browse files Browse the repository at this point in the history
Adding multiple path support when requesting audit events
  • Loading branch information
tegefaulkes authored May 30, 2024
2 parents dc72c7c + c1669b6 commit 527a75a
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 93 deletions.
7 changes: 7 additions & 0 deletions src/audit/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type AuditEventToAuditEventSerialized<T extends AuditEvent> = Omit<T, 'id'> & {
id: AuditEventIdEncoded;
};

/**
* Equivalent to `Array<string>` but restricted to the available paths in `topicPaths`.
*/
type TopicPath = (typeof topicPaths)[number];

type TopicSubPath<T = TopicPath> =
Expand Down Expand Up @@ -158,6 +161,9 @@ type AuditEventDiscoveryCheckRediscovery = AuditEventBase<
typeof discoveryCheckRediscoveryTopicPath
>;

// @ts-ignore: recursive definition for defining a tree
type TopicPathTreeNode = Record<string, TopicPathTreeNode>;

// Metrics

type MetricPath = (typeof metricPaths)[number];
Expand Down Expand Up @@ -213,6 +219,7 @@ export type {
AuditEventDiscoveryVertexCulled,
AuditEventDiscoveryVertexCancelled,
AuditEventDiscoveryCheckRediscovery,
TopicPathTreeNode,
// Metric
MetricPath,
MetricPathToAuditMetric,
Expand Down
122 changes: 122 additions & 0 deletions src/audit/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import type {
AuditEventDiscoveryVertexCulled,
AuditEventDiscoveryVertexCancelled,
AuditEventDiscoveryCheckRediscovery,
TopicPathTreeNode,
} from './types';
import type * as nodesEvents from '../nodes/events';
import type * as discoveryEvents from '../discovery/events';
import type { AuditEventId } from '../ids';
import type { TopicPath } from './types';
import { IdInternal } from '@matrixai/id';
import * as sortableIdUtils from '@matrixai/id/dist/IdSortable';
import * as nodesUtils from '../nodes/utils';
Expand Down Expand Up @@ -187,6 +189,43 @@ const topicPaths = [
discoveryCheckRediscoveryTopicPath,
] as const;

/**
* Takes the list of `topicPaths` and converts it to a tree structure.
* This structure is much more efficient when checking if a path is a valid `TopicPath`.
*/
function generateTopicPathTree() {
const tree: TopicPathTreeNode = {};
for (const topicPath of topicPaths) {
let node: TopicPathTreeNode = tree;
for (const topicPathElement of topicPath) {
if (node[topicPathElement] == null) node[topicPathElement] = {};
node = node[topicPathElement];
}
}
return tree;
}

/**
* All the valid `topicPath`s condensed into a tree format.
* Used to quickly check if a path is valid.
*/
const topicPathTree = generateTopicPathTree();

/**
* TypeGuard used to assert if a value is a `TopicPath`.
* Uses `topicPathTree` to quickly check if the path is valid.
*/
function isTopicPath(it: unknown): it is TopicPath {
if (!Array.isArray(it)) return false;
let node = topicPathTree;
for (const pathElement of it) {
if (typeof pathElement !== 'string') return false;
if (node[pathElement] == null) return false;
node = node[pathElement];
}
return true;
}

// Metrics

// Nodes
Expand All @@ -211,6 +250,85 @@ const metricPaths = [
nodeConnectionOutboundMetricPath,
] as const;

/**
* Will take an array of dot path sorted paths and return the minimal list common paths.
* So sub-paths will be filtered out if we already contain a parent path E.G. `a.b` will be removed if we also include `a`.
* Duplicate paths will be removed, so `a` will be removed if two `a`'s exist.
*/
function filterSubPaths(paths: Array<Array<string>>): Array<Array<string>> {
let previous: string = '';
return paths
.map((v) => v.join('.'))
.sort()
.filter((value, index) => {
// Checking if the current value is included within the previous
if (index === 0 || !value.startsWith(previous)) {
previous = value;
return true;
}
return false;
}, {})
.map((v) => v.split('.'));
}

/**
* This takes N generators that yield data in a sorted order and combines their outputs in a fully sorted order.
* This will only work on pre-sorted outputs from the generator.
*/
async function* genSort<T>(
sortFn: (a: T, b: T) => number,
...gens: Array<AsyncGenerator<T, void, void>>
): AsyncGenerator<T, void, void> {
const heads: Array<{
value: T;
gen: AsyncGenerator<T, void, void>;
index: number;
}> = [];
// Seed the heads
let i = 0;
for (const gen of gens) {
const head = await gen.next();
if (!head.done) {
heads.push({
value: head.value,
gen,
index: i++,
});
}
}
if (heads.length === 0) return;

// Yield from heads until all iterators are done
let first = true;
let previous: T;
try {
while (true) {
// Sort them in order by the sortFn
heads.sort(({ value: a }, { value: b }) => sortFn(a, b));
// Yield the first in the order
const head = heads[0];
// Skip any duplicates
if (first || sortFn(previous!, head.value) !== 0) yield head.value;
first = false;
previous = head.value;
// Get the new head for that generator
const next = await head.gen.next();
// If the generator is done then we remove it from the heads, otherwise update the head value
if (next.done) {
heads.shift();
} else {
head.value = next.value;
}
// If the last head is done then we break
if (heads.length === 0) return;
}
} finally {
for (const { gen } of heads) {
await gen.return();
}
}
}

export {
extractFromSeek,
createAuditEventIdGenerator,
Expand All @@ -234,8 +352,12 @@ export {
fromEventDiscoveryCheckRediscovery,
nodeGraphTopicPath,
topicPaths,
topicPathTree,
isTopicPath,
nodeConnectionMetricPath,
nodeConnectionInboundMetricPath,
nodeConnectionOutboundMetricPath,
metricPaths,
filterSubPaths,
genSort,
};
30 changes: 0 additions & 30 deletions src/client/callers/auditEventsGet.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,12 @@
import type { ReadableStream } from 'stream/web';
import type { HandlerTypes } from '@matrixai/rpc';
import type { ContextTimedInput } from '@matrixai/contexts';
import type {
AuditEventToAuditEventSerialized,
TopicSubPath,
TopicSubPathToAuditEvent,
} from '../../audit/types';
import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types';
import type AuditEventsGet from '../handlers/AuditEventsGet';
import type { AuditEventIdEncoded } from '../../ids/types';
import { ServerCaller } from '@matrixai/rpc';

type CallerTypes = HandlerTypes<AuditEventsGet>;

type AuditEventsGetTypeOverride = <T extends TopicSubPath>(
input: ClientRPCRequestParams<{
seek?: AuditEventIdEncoded | number;
seekEnd?: AuditEventIdEncoded | number;
order?: 'asc' | 'desc';
limit?: number;
awaitFutureEvents?: boolean;
}> & {
path: T;
},
ctx?: ContextTimedInput,
) => Promise<
ReadableStream<
ClientRPCResponseResult<
AuditEventToAuditEventSerialized<TopicSubPathToAuditEvent<T>>
>
>
>;

const auditEventsGet = new ServerCaller<
CallerTypes['input'],
CallerTypes['output']
>();

export default auditEventsGet;

export type { AuditEventsGetTypeOverride };
82 changes: 53 additions & 29 deletions src/client/handlers/AuditEventsGet.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { ContextTimed } from '@matrixai/contexts';
import type { ClientRPCRequestParams, ClientRPCResponseResult } from '../types';
import type {
AuditEvent,
AuditEventSerialized,
AuditEventToAuditEventSerialized,
TopicSubPath,
TopicSubPathToAuditEvent,
TopicPath,
} from '../../audit/types';
import type { Audit } from '../../audit';
import type { AuditEventId, AuditEventIdEncoded } from '../../ids';
Expand All @@ -16,7 +16,7 @@ class AuditEventsGet extends ServerHandler<
audit: Audit;
},
ClientRPCRequestParams<{
path: TopicSubPath & Array<string>;
paths: Array<Array<string>>;
seek?: AuditEventIdEncoded | number;
seekEnd?: AuditEventIdEncoded | number;
order?: 'asc' | 'desc';
Expand All @@ -25,9 +25,9 @@ class AuditEventsGet extends ServerHandler<
}>,
ClientRPCResponseResult<AuditEventSerialized>
> {
public async *handle<T extends TopicSubPath>(
public async *handle(
{
path,
paths,
seek,
seekEnd,
order = 'asc',
Expand All @@ -40,18 +40,16 @@ class AuditEventsGet extends ServerHandler<
limit?: number;
awaitFutureEvents?: boolean;
}> & {
path: T;
paths: Array<Array<string>>;
},
_cancel,
_meta,
ctx: ContextTimed,
): AsyncGenerator<
ClientRPCResponseResult<
AuditEventToAuditEventSerialized<TopicSubPathToAuditEvent<T>>
>
ClientRPCResponseResult<AuditEventToAuditEventSerialized<AuditEvent>>
> {
const { audit } = this.container;
let iterator: AsyncGenerator<TopicSubPathToAuditEvent<T>>;
const iterators: Array<AsyncGenerator<AuditEvent>> = [];
let seek_: AuditEventId | number | undefined;
if (seek != null) {
seek_ =
Expand All @@ -64,28 +62,54 @@ class AuditEventsGet extends ServerHandler<
? auditUtils.decodeAuditEventId(seekEnd)
: seekEnd;
}
// If the call is descending chronologically, or does not want to await future events,
// it should not await future events.
if (!awaitFutureEvents || order === 'desc') {
iterator = audit.getAuditEvents(path, {
seek: seek_,
seekEnd: seekEnd_,
order: order,
limit: limit,
});
} else {
iterator = audit.getAuditEventsLongRunning(path, {
seek: seek_,
seekEnd: seekEnd_,
limit: limit,
});

// Convert the paths
const topicPaths: Array<TopicPath> = [];
for (const topicPath of auditUtils.filterSubPaths(paths)) {
if (auditUtils.isTopicPath(topicPath)) topicPaths.push(topicPath);
}

// Creating iterators for each `topicPath`
for (const topicPath of topicPaths) {
if (awaitFutureEvents) {
// If we're awaiting future events then we call `getAuditEventsLongRunning`, order is forced to `asc` in this case
const iterator = audit.getAuditEventsLongRunning(topicPath, {
seek: seek_,
seekEnd: seekEnd_,
limit: limit,
});
iterators.push(iterator);
} else {
// Otherwise we use the normal `getAuditEvents`
const iterator = audit.getAuditEvents(topicPath, {
seek: seek_,
seekEnd: seekEnd_,
order: order,
limit: limit,
});
iterators.push(iterator);
}
}

// We need to reverse the compare if we are descending in time
const orderSwitchMultiplier = awaitFutureEvents || order === 'asc' ? 1 : -1;
function sortFn(a: AuditEvent, b: AuditEvent) {
return Buffer.compare(a.id, b.id) * orderSwitchMultiplier;
}

const combinedIterator = auditUtils.genSort<AuditEvent>(
sortFn,
...iterators,
);
ctx.signal.addEventListener('abort', async () => {
await iterator.return(ctx.signal.reason);
await combinedIterator.return(ctx.signal.reason);
});
for await (const auditEvent of iterator) {
(auditEvent.id as any) = auditUtils.encodeAuditEventId(auditEvent.id);
yield auditEvent as any;
for await (const auditEvent of combinedIterator) {
yield {
id: auditUtils.encodeAuditEventId(auditEvent.id),
path: auditEvent.path,
data: auditEvent.data,
};
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/client/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import AgentLockAll from './AgentLockAll';
import AgentStatus from './AgentStatus';
import AgentStop from './AgentStop';
import AgentUnlock from './AgentUnlock';
import AuditEventsGet from './AuditEventsGet';
import AuditMetricGet from './AuditMetricGet';
import GestaltsActionsGetByIdentity from './GestaltsActionsGetByIdentity';
import GestaltsActionsGetByNode from './GestaltsActionsGetByNode';
import GestaltsActionsSetByIdentity from './GestaltsActionsSetByIdentity';
Expand Down Expand Up @@ -89,8 +91,6 @@ import VaultsSecretsNewDir from './VaultsSecretsNewDir';
import VaultsSecretsRename from './VaultsSecretsRename';
import VaultsSecretsStat from './VaultsSecretsStat';
import VaultsVersion from './VaultsVersion';
import AuditEventsGet from './AuditEventsGet';
import AuditMetricGet from './AuditMetricGet';

/**
* Server manifest factory.
Expand Down Expand Up @@ -205,6 +205,8 @@ export {
AgentStatus,
AgentStop,
AgentUnlock,
AuditEventsGet,
AuditMetricGet,
GestaltsActionsGetByIdentity,
GestaltsActionsGetByNode,
GestaltsActionsSetByIdentity,
Expand Down
Loading

0 comments on commit 527a75a

Please sign in to comment.