From 06459e36e0b5e76e32342298d853bd6b61767877 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Thu, 25 May 2023 13:54:53 +1000 Subject: [PATCH] wip: converting `NodeConnection` * Related #512 * Related #495 * Related #234 [ci skip] --- src/nodes/NodeConnection.ts | 175 +++++++----------- src/nodes/NodeConnectionManager.ts | 89 ++++----- src/nodes/types.ts | 17 ++ .../GRPCClientAgent.test.ts | 0 .../service/nodesChainDataGet.test.ts | 0 .../service/nodesClosestLocalNode.test.ts | 0 .../service/nodesCrossSignClaim.test.ts | 0 .../service/nodesHolePunchMessage.test.ts | 0 .../service/notificationsSend.test.ts | 0 tests/{agent => agent-old}/utils.ts | 0 10 files changed, 113 insertions(+), 168 deletions(-) rename tests/{agent => agent-old}/GRPCClientAgent.test.ts (100%) rename tests/{agent => agent-old}/service/nodesChainDataGet.test.ts (100%) rename tests/{agent => agent-old}/service/nodesClosestLocalNode.test.ts (100%) rename tests/{agent => agent-old}/service/nodesCrossSignClaim.test.ts (100%) rename tests/{agent => agent-old}/service/nodesHolePunchMessage.test.ts (100%) rename tests/{agent => agent-old}/service/notificationsSend.test.ts (100%) rename tests/{agent => agent-old}/utils.ts (100%) diff --git a/src/nodes/NodeConnection.ts b/src/nodes/NodeConnection.ts index df1add3fcb..2deba2ca13 100644 --- a/src/nodes/NodeConnection.ts +++ b/src/nodes/NodeConnection.ts @@ -1,26 +1,27 @@ import type { NodeId } from './types'; import type { Host, Hostname, Port } from '../network/types'; import type { Certificate } from '../keys/types'; -import type Proxy from '../network/Proxy'; -import type GRPCClient from '../grpc/GRPCClient'; import type { ContextTimed } from '../contexts/types'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; +import type { ClientManifest } from '@/rpc/types'; +import type { Host as QUICHost, Port as QUICPort } from '@matrixai/quic'; +import type { QUICClientConfig } from './types'; import Logger from '@matrixai/logger'; import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; -import * as asyncInit from '@matrixai/async-init'; +import { QUICClient } from '@matrixai/quic'; +import RPCClient from '@/rpc/RPCClient'; import * as nodesErrors from './errors'; import { context, timedCancellable } from '../contexts/index'; -import * as grpcErrors from '../grpc/errors'; import * as networkUtils from '../network/utils'; -import { timerStart } from '../utils/index'; +import * as rpcUtils from '../rpc/utils'; +// TODO: extend an event system, use events for cleaning up. /** * Encapsulates the unidirectional client-side connection of one node to another. */ -// eslint-disable-next-line @typescript-eslint/no-unused-vars -- False positive for T -interface NodeConnection extends CreateDestroy {} +interface NodeConnection extends CreateDestroy {} @CreateDestroy() -class NodeConnection { +class NodeConnection { public readonly host: Host; public readonly port: Port; /** @@ -30,124 +31,84 @@ class NodeConnection { public readonly hostname?: Hostname; protected logger: Logger; - protected destroyCallback: () => Promise; - protected proxy: Proxy; - protected client: T; + protected quicClient: QUICClient; + protected rpcClient: RPCClient; - static createNodeConnection( + static createNodeConnection( { targetNodeId, targetHost, targetPort, targetHostname, - proxy, - clientFactory, - destroyCallback, - destroyTimeout, + quicClientConfig, + manifest, logger, }: { targetNodeId: NodeId; targetHost: Host; targetPort: Port; targetHostname?: Hostname; - proxy: Proxy; - clientFactory: (...args) => Promise; - destroyCallback?: () => Promise; - destroyTimeout?: number; + quicClientConfig: QUICClientConfig; + manifest: M; logger?: Logger; }, ctx?: Partial, - ): PromiseCancellable>; + ): PromiseCancellable>; @timedCancellable(true, 20000) - static async createNodeConnection( + static async createNodeConnection( { targetNodeId, targetHost, targetPort, targetHostname, - proxy, - clientFactory, - destroyCallback = async () => {}, - destroyTimeout = 2000, + quicClientConfig, + manifest, logger = new Logger(this.name), }: { targetNodeId: NodeId; targetHost: Host; targetPort: Port; targetHostname?: Hostname; - proxy: Proxy; - clientFactory: (...args) => Promise; - destroyCallback?: () => Promise; - destroyTimeout?: number; + quicClientConfig: QUICClientConfig; + manifest: M; logger?: Logger; }, @context ctx: ContextTimed, - ): Promise> { + ): Promise> { logger.info(`Creating ${this.name}`); // Checking if attempting to connect to a wildcard IP if (networkUtils.isHostWildcard(targetHost)) { throw new nodesErrors.ErrorNodeConnectionHostWildcard(); } - const proxyConfig = { - host: proxy.getForwardHost(), - port: proxy.getForwardPort(), - authToken: proxy.authToken, - }; - // 1. Ask fwdProxy for connection to target (the revProxy of other node) - // 2. Start sending hole-punching packets to the target (via the client start - - // this establishes a HTTP CONNECT request with the forward proxy) - // 3. Relay the proxy port to the broker/s (such that they can inform the other node) - // 4. Start sending hole-punching packets to other node (done in openConnection()) - // Done in parallel - const nodeConnection = new this({ + // TODO: this needs to be updated to take a context, + // still uses old timer style. + const clientLogger = logger.getChild(RPCClient.name); + // TODO: Custom TLS validation with NodeId + // TODO: Idle timeout and connection timeout is the same thing from the `quic` perspective. + // THis means we need to race our timeout timer + const quicClient = await QUICClient.createQUICClient({ + host: targetHost as unknown as QUICHost, // FIXME: better type conversion? + port: targetPort as unknown as QUICPort, // FIXME: better type conversion? + ...quicClientConfig, + logger: logger.getChild(QUICClient.name), + }); + const rpcClient = await RPCClient.createRPCClient({ + manifest, + middlewareFactory: rpcUtils.defaultClientMiddlewareWrapper(), + streamFactory: () => { + return quicClient.connection.streamNew(); + }, + logger: clientLogger, + }); + const nodeConnection = new this({ host: targetHost, port: targetPort, hostname: targetHostname, - proxy: proxy, - destroyCallback, + quicClient, + rpcClient, logger, }); - let client: T; - try { - // TODO: this needs to be updated to take a context, - // still uses old timer style. - const clientLogger = logger.getChild(clientFactory.name); - client = await clientFactory({ - nodeId: targetNodeId, - host: targetHost, - port: targetPort, - proxyConfig: proxyConfig, - // Think about this - logger: clientLogger, - destroyCallback: async () => { - clientLogger.debug(`GRPC client triggered destroyedCallback`); - if ( - nodeConnection[asyncInit.status] !== 'destroying' && - !nodeConnection[asyncInit.destroyed] - ) { - await nodeConnection.destroy({ timeout: destroyTimeout }); - } - }, - // FIXME: this needs to be replaced with - // the GRPC timerCancellable update - timer: timerStart(ctx.timer.getTimeout()), - }); - // 5. When finished, you have a connection to other node - // The GRPCClient is ready to be used for requests - } catch (e) { - await nodeConnection.destroy({ timeout: destroyTimeout }); - // If the connection times out, re-throw this with a higher level nodes exception - if (e instanceof grpcErrors.ErrorGRPCClientTimeout) { - throw new nodesErrors.ErrorNodeConnectionTimeout(e.message, { - cause: e, - }); - } - throw e; - } - // FIXME: we need a finally block here to do cleanup. - // TODO: This is due to chicken or egg problem - // see if we can move to CreateDestroyStartStop to resolve this - nodeConnection.client = client; + nodeConnection.rpcClient = rpcClient; logger.info(`Created ${this.name}`); return nodeConnection; } @@ -156,48 +117,43 @@ class NodeConnection { host, port, hostname, - proxy, - destroyCallback, + quicClient, + rpcClient, logger, }: { host: Host; port: Port; hostname?: Hostname; - proxy: Proxy; - destroyCallback: () => Promise; + quicClient: QUICClient; + rpcClient: RPCClient; logger: Logger; }) { this.logger = logger; this.host = host; this.port = port; this.hostname = hostname; - this.proxy = proxy; - this.destroyCallback = destroyCallback; + this.quicClient = quicClient; + this.rpcClient = rpcClient; } public async destroy({ - timeout, + force, }: { - timeout?: number; + force?: boolean; } = {}) { this.logger.info(`Destroying ${this.constructor.name}`); - if ( - this.client != null && - this.client[asyncInit.status] !== 'destroying' && - !this.client[asyncInit.destroyed] - ) { - await this.client.destroy({ timeout }); - } - this.logger.debug(`${this.constructor.name} triggered destroyedCallback`); - await this.destroyCallback(); + await this.quicClient.destroy({ force }); + await this.rpcClient.destroy(); + this.logger.debug(`${this.constructor.name} triggered destroyed event`); + // TODO: trigger destroy event this.logger.info(`Destroyed ${this.constructor.name}`); } /** * Gets GRPCClient for this node connection */ - public getClient(): T { - return this.client; + public getClient(): RPCClient { + return this.rpcClient; } /** @@ -207,11 +163,10 @@ class NodeConnection { */ @ready(new nodesErrors.ErrorNodeConnectionDestroyed()) public getRootCertChain(): Array { - const connInfo = this.proxy.getConnectionInfoByProxy(this.host, this.port); - if (connInfo == null) { - throw new nodesErrors.ErrorNodeConnectionInfoNotExist(); - } - return connInfo.remoteCertificates; + const connInfo = this.quicClient.connection.remoteInfo; + // TODO: + throw Error('TMP IMP'); + // Return connInfo.remoteCertificates; } } diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 85c7c55f90..ea745494a0 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -1,6 +1,5 @@ import type { ResourceAcquire } from '@matrixai/resources'; import type KeyRing from '../keys/KeyRing'; -import type Proxy from '../network/Proxy'; import type { Host, Hostname, Port } from '../network/types'; import type NodeGraph from './NodeGraph'; import type TaskManager from '../tasks/TaskManager'; @@ -26,17 +25,19 @@ import NodeConnection from './NodeConnection'; import * as nodesUtils from './utils'; import * as nodesErrors from './errors'; import { context, timedCancellable } from '../contexts'; -import GRPCClientAgent from '../agent/GRPCClientAgent'; import * as validationUtils from '../validation/utils'; import * as networkUtils from '../network/utils'; import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb'; import { never, promise } from '../utils'; import { resolveHostnames } from '../network/utils'; +import { clientManifest as agentClientManifest } from '../agent/handlers/clientManifest'; // TODO: check all locking and add cancellation for it. +type AgentClientManifest = typeof clientManifest; + type ConnectionAndTimer = { - connection: NodeConnection; + connection: NodeConnection; timer: Timer | null; usageCount: number; }; @@ -68,7 +69,6 @@ class NodeConnectionManager { protected logger: Logger; protected nodeGraph: NodeGraph; protected keyRing: KeyRing; - protected proxy: Proxy; protected taskManager: TaskManager; // NodeManager has to be passed in during start to allow co-dependency protected nodeManager: NodeManager | undefined; @@ -92,12 +92,10 @@ class NodeConnectionManager { > = new Map(); protected backoffDefault: number = 300; // 5 min protected backoffMultiplier: number = 2; // Doubles every failure - protected ncDestroyTimeout: number; public constructor({ keyRing, nodeGraph, - proxy, taskManager, seedNodes = {}, initialClosestNodes = 3, @@ -109,7 +107,6 @@ class NodeConnectionManager { }: { nodeGraph: NodeGraph; keyRing: KeyRing; - proxy: Proxy; taskManager: TaskManager; seedNodes?: SeedNodes; initialClosestNodes?: number; @@ -122,7 +119,6 @@ class NodeConnectionManager { this.logger = logger ?? new Logger(NodeConnectionManager.name); this.keyRing = keyRing; this.nodeGraph = nodeGraph; - this.proxy = proxy; this.taskManager = taskManager; const localNodeIdEncoded = nodesUtils.encodeNodeId(keyRing.getNodeId()); delete seedNodes[localNodeIdEncoded]; @@ -175,7 +171,7 @@ class NodeConnectionManager { public async acquireConnection( targetNodeId: NodeId, ctx?: Partial, - ): Promise>> { + ): Promise>> { if (this.keyRing.getNodeId().equals(targetNodeId)) { this.logger.warn('Attempting connection to our own NodeId'); } @@ -221,7 +217,7 @@ class NodeConnectionManager { */ public withConnF( targetNodeId: NodeId, - f: (conn: NodeConnection) => Promise, + f: (conn: NodeConnection) => Promise, ctx?: Partial, ): PromiseCancellable; @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) @@ -232,7 +228,7 @@ class NodeConnectionManager { ) public async withConnF( targetNodeId: NodeId, - f: (conn: NodeConnection) => Promise, + f: (conn: NodeConnection) => Promise, @context ctx: ContextTimed, ): Promise { return await withF( @@ -254,7 +250,7 @@ class NodeConnectionManager { public async *withConnG( targetNodeId: NodeId, g: ( - conn: NodeConnection, + conn: NodeConnection, ) => AsyncGenerator, ctx?: Partial, ): AsyncGenerator { @@ -352,27 +348,23 @@ class NodeConnectionManager { const nodeConnectionProms = targetAddresses.map((address, index) => { const destroyCallbackProm = promise(); destroyCallbackProms[index] = destroyCallbackProm; - const nodeConnectionProm = NodeConnection.createNodeConnection( - { - targetNodeId: targetNodeId, - targetHost: address.host, - targetHostname: targetHostname, - targetPort: address.port, - proxy: this.proxy, - destroyTimeout: this.ncDestroyTimeout, - destroyCallback: async () => { - destroyCallbackProm.resolveP(); - }, - logger: this.logger.getChild( - `${NodeConnection.name} [${nodesUtils.encodeNodeId( - targetNodeId, - )}@${address.host}:${address.port}]`, - ), - clientFactory: async (args) => - GRPCClientAgent.createGRPCClientAgent(args), + const nodeConnectionProm = NodeConnection.createNodeConnection({ + destroyCallback(): Promise { + return Promise.resolve(undefined); }, - ctx, - ); + destroyTimeout: 0, + manifest: agentClientManifest, + quicClientConfig: {}, + targetHost: address.host, + targetPort: address.port, + targetHostname: targetHostname, + targetNodeId: targetNodeId, + logger: this.logger.getChild( + `${NodeConnection.name} [${nodesUtils.encodeNodeId( + targetNodeId, + )}@${address.host}:${address.port}]`, + ), + }); void nodeConnectionProm.then( () => firstConnectionIndexProm.resolveP(index), (e) => { @@ -389,7 +381,7 @@ class NodeConnectionManager { ); return nodeConnectionProm; }); - let newConnection: NodeConnection; + let newConnection: NodeConnection; try { newConnection = await Promise.any(nodeConnectionProms); } catch (e) { @@ -417,7 +409,7 @@ class NodeConnectionManager { if (index === successfulIndex) return; nodeConnectionProm.cancel(cleanUpReason); return nodeConnectionProm.then(async (nodeConnection) => { - await nodeConnection.destroy({ timeout: this.ncDestroyTimeout }); + await nodeConnection.destroy(); // TODO: force? }); }), ); @@ -504,30 +496,8 @@ class NodeConnectionManager { proxyPort: Port, ctx?: Partial, ): PromiseCancellable { - return this.proxy.openConnectionReverse(proxyHost, proxyPort, ctx); - } - - /** - * Treat this node as the client. - * Instruct the forward proxy to send hole-punching packets back to the target's - * reverse proxy, in order to open a connection from this client to the server. - * A connection is established if the client node's reverse proxy is sending - * hole punching packets at the same time as this node (acting as the client) - * sends hole-punching packets back to the server's reverse proxy. - * This is not needed to be called when doing hole punching since the - * ForwardProxy automatically starts the process. - * @param nodeId Node ID of the node we are connecting to - * @param proxyHost Proxy host of the reverse proxy - * @param proxyPort Proxy port of the reverse proxy - * @param ctx - */ - public async holePunchForward( - nodeId: NodeId, - proxyHost: Host, - proxyPort: Port, - ctx?: ContextTimed, - ): Promise { - await this.proxy.openConnectionForward([nodeId], proxyHost, proxyPort, ctx); + // TODO: tell the server to hole punch reverse + throw Error('TMP IMP'); } /** @@ -915,6 +885,9 @@ class NodeConnectionManager { }); } + // FIXME: How do we handle pinging now? Previously this was done on the proxy level. + // Now I think we need to actually establish a connection. Pinging should just be establishing a connection now. + // I'll keep this but have it wrap normal connection establishment. /** * Checks if a connection can be made to the target. Returns true if the * connection can be authenticated, it's certificate matches the nodeId and diff --git a/src/nodes/types.ts b/src/nodes/types.ts index 41eb082b35..7442c0509a 100644 --- a/src/nodes/types.ts +++ b/src/nodes/types.ts @@ -1,5 +1,8 @@ import type { NodeId, NodeIdString, NodeIdEncoded } from '../ids/types'; import type { Host, Hostname, Port } from '../network/types'; +import type { Crypto } from '@matrixai/quic'; +import type { Host as QUICHost, Port as QUICPort } from '@matrixai/quic'; +import type { QUICConfig } from '@matrixai/quic/dist/config'; /** * Key indicating which space the NodeGraph is in @@ -26,6 +29,19 @@ type NodeData = { type SeedNodes = Record; +type QUICClientConfig = { + crypto: { + key: ArrayBuffer; + ops: Crypto; + }; + localHost?: QUICHost; + localPort?: QUICPort; + config?: QUICConfig; + keepaliveIntervalTime?: number; + maxReadableStreamBytes?: number; + maxWritableStreamBytes?: number; +}; + export type { NodeId, NodeIdString, @@ -37,4 +53,5 @@ export type { NodeBucket, NodeData, NodeGraphSpace, + QUICClientConfig, }; diff --git a/tests/agent/GRPCClientAgent.test.ts b/tests/agent-old/GRPCClientAgent.test.ts similarity index 100% rename from tests/agent/GRPCClientAgent.test.ts rename to tests/agent-old/GRPCClientAgent.test.ts diff --git a/tests/agent/service/nodesChainDataGet.test.ts b/tests/agent-old/service/nodesChainDataGet.test.ts similarity index 100% rename from tests/agent/service/nodesChainDataGet.test.ts rename to tests/agent-old/service/nodesChainDataGet.test.ts diff --git a/tests/agent/service/nodesClosestLocalNode.test.ts b/tests/agent-old/service/nodesClosestLocalNode.test.ts similarity index 100% rename from tests/agent/service/nodesClosestLocalNode.test.ts rename to tests/agent-old/service/nodesClosestLocalNode.test.ts diff --git a/tests/agent/service/nodesCrossSignClaim.test.ts b/tests/agent-old/service/nodesCrossSignClaim.test.ts similarity index 100% rename from tests/agent/service/nodesCrossSignClaim.test.ts rename to tests/agent-old/service/nodesCrossSignClaim.test.ts diff --git a/tests/agent/service/nodesHolePunchMessage.test.ts b/tests/agent-old/service/nodesHolePunchMessage.test.ts similarity index 100% rename from tests/agent/service/nodesHolePunchMessage.test.ts rename to tests/agent-old/service/nodesHolePunchMessage.test.ts diff --git a/tests/agent/service/notificationsSend.test.ts b/tests/agent-old/service/notificationsSend.test.ts similarity index 100% rename from tests/agent/service/notificationsSend.test.ts rename to tests/agent-old/service/notificationsSend.test.ts diff --git a/tests/agent/utils.ts b/tests/agent-old/utils.ts similarity index 100% rename from tests/agent/utils.ts rename to tests/agent-old/utils.ts