Skip to content

Commit

Permalink
Merge pull request #475 from MatrixAI/feature-nodes_cancellability
Browse files Browse the repository at this point in the history
Updating the `Nodes` domain with `timedCancellable`
  • Loading branch information
tegefaulkes authored Oct 7, 2022
2 parents 06df14a + 69c0ffe commit e428206
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 411 deletions.
2 changes: 1 addition & 1 deletion src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ class PolykeyAgent {
await this.nodeManager.start();
await this.nodeConnectionManager.start({ nodeManager: this.nodeManager });
await this.nodeGraph.start({ fresh });
await this.nodeManager.syncNodeGraph(false);
await this.nodeManager.syncNodeGraph(false, 2000);
await this.discovery.start({ fresh });
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
Expand Down
6 changes: 5 additions & 1 deletion src/contexts/decorators/cancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ function cancellable(lazy: boolean = false) {
let ctx: Partial<ContextCancellable> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextCancellable(ctx, key, targetName);
return setupCancellable(
Expand Down
24 changes: 20 additions & 4 deletions src/contexts/decorators/timed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ function timed(
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
Expand All @@ -51,8 +55,12 @@ function timed(
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
Expand All @@ -71,8 +79,12 @@ function timed(
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
Expand All @@ -91,8 +103,12 @@ function timed(
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const teardownContext = setupTimedContext(
Expand Down
6 changes: 5 additions & 1 deletion src/contexts/decorators/timedCancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ function timedCancellable(
let ctx: Partial<ContextTimed> = args[contextIndex];
if (ctx === undefined) {
ctx = {};
args[contextIndex] = ctx;
} else {
// Copy the ctx into a new ctx object to avoid mutating the ctx in case
// it is used again
ctx = { ...ctx };
}
args[contextIndex] = ctx;
// Runtime type check on the context parameter
contextsUtils.checkContextTimed(ctx, key, targetName);
const lazy_ = typeof lazy === 'boolean' ? lazy : lazy(this);
Expand Down
2 changes: 1 addition & 1 deletion src/contexts/functions/cancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function cancellable<C extends ContextCancellable, P extends Array<any>, R>(
lazy: boolean = false,
): (...params: ContextAndParameters<C, P>) => PromiseCancellable<R> {
return (...params) => {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
return setupCancellable(f, lazy, ctx, args);
};
Expand Down
8 changes: 4 additions & 4 deletions src/contexts/functions/timed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ function timed<C extends ContextTimed, P extends Array<any>>(
): (...params: ContextAndParameters<C, P>) => any {
if (f instanceof utils.AsyncFunction) {
return async (...params) => {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
const teardownContext = setupTimedContext(
delay,
Expand All @@ -140,7 +140,7 @@ function timed<C extends ContextTimed, P extends Array<any>>(
};
} else if (f instanceof utils.GeneratorFunction) {
return function* (...params) {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
const teardownContext = setupTimedContext(
delay,
Expand All @@ -155,7 +155,7 @@ function timed<C extends ContextTimed, P extends Array<any>>(
};
} else if (f instanceof utils.AsyncGeneratorFunction) {
return async function* (...params) {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
const teardownContext = setupTimedContext(
delay,
Expand All @@ -170,7 +170,7 @@ function timed<C extends ContextTimed, P extends Array<any>>(
};
} else {
return (...params) => {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
const teardownContext = setupTimedContext(
delay,
Expand Down
2 changes: 1 addition & 1 deletion src/contexts/functions/timedCancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ function timedCancellable<C extends ContextTimed, P extends Array<any>, R>(
errorTimeoutConstructor: new () => Error = contextsErrors.ErrorContextsTimedTimeOut,
): (...params: ContextAndParameters<C, P>) => PromiseCancellable<R> {
return (...params) => {
const ctx = params[0] ?? {};
const ctx = params[0] != null ? { ...params[0] } : {};
const args = params.slice(1) as P;
return setupTimedCancellable(
f,
Expand Down
6 changes: 5 additions & 1 deletion src/network/ConnectionForward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ class ConnectionForward extends Connection {
promise<void>();
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
if (ctx.signal.aborted) {
resolveAbortedP();
} else {
ctx.signal.addEventListener('abort', () => resolveAbortedP());
}
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
const handleStartError = (e) => {
Expand Down
135 changes: 85 additions & 50 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import type { Certificate, PublicKey, PublicKeyPem } from '../keys/types';
import type Proxy from '../network/Proxy';
import type GRPCClient from '../grpc/GRPCClient';
import type NodeConnectionManager from './NodeConnectionManager';
import type { Timer } from '../types';
import type { ContextTimed } from '../contexts/types';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import Logger from '@matrixai/logger';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import * as asyncInit from '@matrixai/async-init';
import * as nodesErrors from './errors';
import { context, timedCancellable } from '../contexts/index';
import * as keysUtils from '../keys/utils';
import * as grpcErrors from '../grpc/errors';
import * as networkUtils from '../network/utils';
import { timerStart } from '../utils/index';

/**
* Encapsulates the unidirectional client-side connection of one node to another.
Expand All @@ -34,31 +37,59 @@ class NodeConnection<T extends GRPCClient> {
protected proxy: Proxy;
protected client: T;

static async createNodeConnection<T extends GRPCClient>({
targetNodeId,
targetHost,
targetPort,
targetHostname,
timer,
proxy,
keyManager,
clientFactory,
nodeConnectionManager,
destroyCallback = async () => {},
logger = new Logger(this.name),
}: {
targetNodeId: NodeId;
targetHost: Host;
targetPort: Port;
targetHostname?: Hostname;
timer?: Timer;
proxy: Proxy;
keyManager: KeyManager;
clientFactory: (...args) => Promise<T>;
nodeConnectionManager: NodeConnectionManager;
destroyCallback?: () => Promise<void>;
logger?: Logger;
}): Promise<NodeConnection<T>> {
static createNodeConnection<T extends GRPCClient>(
{
targetNodeId,
targetHost,
targetPort,
targetHostname,
proxy,
keyManager,
clientFactory,
nodeConnectionManager,
destroyCallback = async () => {},
logger = new Logger(this.name),
}: {
targetNodeId: NodeId;
targetHost: Host;
targetPort: Port;
targetHostname?: Hostname;
proxy: Proxy;
keyManager: KeyManager;
clientFactory: (...args) => Promise<T>;
nodeConnectionManager: NodeConnectionManager;
destroyCallback?: () => Promise<void>;
logger?: Logger;
},
ctx?: Partial<ContextTimed>,
): PromiseCancellable<NodeConnection<T>>;
@timedCancellable(true, 20000)
static async createNodeConnection<T extends GRPCClient>(
{
targetNodeId,
targetHost,
targetPort,
targetHostname,
proxy,
keyManager,
clientFactory,
nodeConnectionManager,
destroyCallback = async () => {},
logger = new Logger(this.name),
}: {
targetNodeId: NodeId;
targetHost: Host;
targetPort: Port;
targetHostname?: Hostname;
proxy: Proxy;
keyManager: KeyManager;
clientFactory: (...args) => Promise<T>;
nodeConnectionManager: NodeConnectionManager;
destroyCallback?: () => Promise<void>;
logger?: Logger;
},
@context ctx: ContextTimed,
): Promise<NodeConnection<T>> {
logger.info(`Creating ${this.name}`);
// Checking if attempting to connect to a wildcard IP
if (networkUtils.isHostWildcard(targetHost)) {
Expand Down Expand Up @@ -91,45 +122,48 @@ class NodeConnection<T extends GRPCClient> {
destroyCallback,
logger,
});
let client;
let client: T;
try {
// Start the hole punching only if we are not connecting to seed nodes
let holePunchPromises: Promise<void>[] = [];
const seedNodes = nodeConnectionManager.getSeedNodes();
const isSeedNode = !!seedNodes.find((nodeId) => {
return nodeId.equals(targetNodeId);
});
if (!isSeedNode) {
holePunchPromises = Array.from(seedNodes, (nodeId) => {
// FIXME: this needs to be cancellable.
// It needs to timeout as well as abort for cleanup
void Array.from(seedNodes, (nodeId) => {
return nodeConnectionManager.sendHolePunchMessage(
nodeId,
keyManager.getNodeId(),
targetNodeId,
proxyAddress,
signature,
ctx,
);
});
}
[client] = await Promise.all([
clientFactory({
nodeId: targetNodeId,
host: targetHost,
port: targetPort,
proxyConfig: proxyConfig,
// Think about this
logger: logger.getChild(clientFactory.name),
destroyCallback: async () => {
if (
nodeConnection[asyncInit.status] !== 'destroying' &&
!nodeConnection[asyncInit.destroyed]
) {
await nodeConnection.destroy();
}
},
timer: timer,
}),
...holePunchPromises,
]);
// TODO: this needs to be updated to take a context,
// still uses old timer style.
client = await clientFactory({
nodeId: targetNodeId,
host: targetHost,
port: targetPort,
proxyConfig: proxyConfig,
// Think about this
logger: logger.getChild(clientFactory.name),
destroyCallback: async () => {
if (
nodeConnection[asyncInit.status] !== 'destroying' &&
!nodeConnection[asyncInit.destroyed]
) {
await nodeConnection.destroy();
}
},
// FIXME: this needs to be replaced with
// the GRPC timerCancellable update
timer: timerStart(ctx.timer.getTimeout()),
});
// 5. When finished, you have a connection to other node
// The GRPCClient is ready to be used for requests
} catch (e) {
Expand All @@ -142,8 +176,9 @@ class NodeConnection<T extends GRPCClient> {
}
throw e;
}
// FIXME: we need a finally block here to do cleanup.
// TODO: This is due to chicken or egg problem
// see if we can move to CreateDestroyStartStop to resolve this
// see if we can move to CreateDestroyStartStop to resolve this
nodeConnection.client = client;
logger.info(`Created ${this.name}`);
return nodeConnection;
Expand Down
Loading

0 comments on commit e428206

Please sign in to comment.