diff --git a/src/bin/nodes/CommandConnections.ts b/src/bin/nodes/CommandConnections.ts new file mode 100644 index 000000000..74148b775 --- /dev/null +++ b/src/bin/nodes/CommandConnections.ts @@ -0,0 +1,87 @@ +import type PolykeyClient from '../../PolykeyClient'; +import type nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; +import CommandPolykey from '../CommandPolykey'; +import * as binUtils from '../utils/utils'; +import * as binProcessors from '../utils/processors'; + +class CommandAdd extends CommandPolykey { + constructor(...args: ConstructorParameters) { + super(...args); + this.name('connections'); + this.description('list all active node connections'); + this.action(async (options) => { + const { default: PolykeyClient } = await import('../../PolykeyClient'); + const utilsPB = await import('../../proto/js/polykey/v1/utils/utils_pb'); + const clientOptions = await binProcessors.processClientOptions( + options.nodePath, + options.nodeId, + options.clientHost, + options.clientPort, + this.fs, + this.logger.getChild(binProcessors.processClientOptions.name), + ); + const meta = await binProcessors.processAuthentication( + options.passwordFile, + this.fs, + ); + let pkClient: PolykeyClient; + this.exitHandlers.handlers.push(async () => { + if (pkClient != null) await pkClient.stop(); + }); + try { + pkClient = await PolykeyClient.createPolykeyClient({ + nodePath: options.nodePath, + nodeId: clientOptions.nodeId, + host: clientOptions.clientHost, + port: clientOptions.clientPort, + logger: this.logger.getChild(PolykeyClient.name), + }); + // DO things here... + // Like create the message. + const emptyMessage = new utilsPB.EmptyMessage(); + + const connections = await binUtils.retryAuthentication(async (auth) => { + const connections = pkClient.grpcClient.nodesListConnections( + emptyMessage, + auth, + ); + const connectionEntries: Array = []; + for await (const connection of connections) { + connectionEntries.push(connection.toObject()); + } + return connectionEntries; + }, meta); + if (options.format === 'human') { + const output: Array = []; + for (const connection of connections) { + const hostnameString = + connection.hostname === '' ? '' : `(${connection.hostname})`; + const hostString = `${connection.nodeId}@${connection.host}${hostnameString}:${connection.port}`; + const usageCount = connection.usageCount; + const timeout = + connection.timeout === -1 ? 'NA' : `${connection.timeout}`; + const outputLine = `${hostString}\t${usageCount}\t${timeout}`; + output.push(outputLine); + } + process.stdout.write( + binUtils.outputFormatter({ + type: 'list', + data: output, + }), + ); + } else { + process.stdout.write( + binUtils.outputFormatter({ + type: 'json', + data: connections, + }), + ); + } + } finally { + if (pkClient! != null) await pkClient.stop(); + } + }); + } +} + +export default CommandAdd; diff --git a/src/bin/nodes/CommandNodes.ts b/src/bin/nodes/CommandNodes.ts index 0866a088f..145aeee38 100644 --- a/src/bin/nodes/CommandNodes.ts +++ b/src/bin/nodes/CommandNodes.ts @@ -3,6 +3,7 @@ import CommandClaim from './CommandClaim'; import CommandFind from './CommandFind'; import CommandPing from './CommandPing'; import CommandGetAll from './CommandGetAll'; +import CommandConnections from './CommandConnections'; import CommandPolykey from '../CommandPolykey'; class CommandNodes extends CommandPolykey { @@ -15,6 +16,7 @@ class CommandNodes extends CommandPolykey { this.addCommand(new CommandFind(...args)); this.addCommand(new CommandPing(...args)); this.addCommand(new CommandGetAll(...args)); + this.addCommand(new CommandConnections(...args)); } } diff --git a/src/client/GRPCClientClient.ts b/src/client/GRPCClientClient.ts index 865b9202e..3f6844552 100644 --- a/src/client/GRPCClientClient.ts +++ b/src/client/GRPCClientClient.ts @@ -915,6 +915,25 @@ class GRPCClientClient extends GRPCClient { )(...args); } + @ready(new clientErrors.ErrorClientClientDestroyed()) + public nodesListConnections( + ...args + ): AsyncGeneratorReadableStreamClient< + nodesPB.NodeConnection, + ClientReadableStream + > { + return grpcUtils.promisifyReadableStreamCall( + this.client, + { + nodeId: this.nodeId, + host: this.host, + port: this.port, + command: this.nodesListConnections.name, + }, + this.client.nodesListConnections, + )(...args); + } + @ready(new clientErrors.ErrorClientClientDestroyed()) public identitiesAuthenticate(...args) { return grpcUtils.promisifyReadableStreamCall( diff --git a/src/client/service/index.ts b/src/client/service/index.ts index 68f98ac8c..c83cca4d7 100644 --- a/src/client/service/index.ts +++ b/src/client/service/index.ts @@ -60,6 +60,7 @@ import nodesClaim from './nodesClaim'; import nodesFind from './nodesFind'; import nodesPing from './nodesPing'; import nodesGetAll from './nodesGetAll'; +import nodesListConnections from './nodesListConnections'; import notificationsClear from './notificationsClear'; import notificationsRead from './notificationsRead'; import notificationsSend from './notificationsSend'; @@ -167,6 +168,7 @@ function createService({ nodesFind: nodesFind(container), nodesPing: nodesPing(container), nodesGetAll: nodesGetAll(container), + nodesListConnections: nodesListConnections(container), notificationsClear: notificationsClear(container), notificationsRead: notificationsRead(container), notificationsSend: notificationsSend(container), diff --git a/src/client/service/nodesListConnections.ts b/src/client/service/nodesListConnections.ts new file mode 100644 index 000000000..aed6023bb --- /dev/null +++ b/src/client/service/nodesListConnections.ts @@ -0,0 +1,52 @@ +import type * as grpc from '@grpc/grpc-js'; +import type { Authenticate } from '../types'; +import type * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb'; +import type Logger from '@matrixai/logger'; +import type NodeConnectionManager from '../../nodes/NodeConnectionManager'; +import * as grpcUtils from '../../grpc/utils'; +import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; +import * as clientUtils from '../utils'; +import * as nodesUtils from '../../nodes/utils'; + +function nodesListConnections({ + authenticate, + nodeConnectionManager, + logger, +}: { + authenticate: Authenticate; + nodeConnectionManager: NodeConnectionManager; + logger: Logger; +}) { + return async ( + call: grpc.ServerWritableStream< + utilsPB.EmptyMessage, + nodesPB.NodeConnection + >, + ): Promise => { + const genWritable = grpcUtils.generatorWritable(call, false); + try { + const metadata = await authenticate(call.metadata); + call.sendMetadata(metadata); + const connections = nodeConnectionManager.listConnections(); + for (const connection of connections) { + const connectionMessage = new nodesPB.NodeConnection(); + connectionMessage.setNodeId(nodesUtils.encodeNodeId(connection.nodeId)); + connectionMessage.setHost(connection.address.host); + connectionMessage.setHostname(connection.address.hostname ?? ''); + connectionMessage.setPort(connection.address.port); + connectionMessage.setUsageCount(connection.usageCount); + connectionMessage.setTimeout(connection.timeout ?? -1); + await genWritable.next(connectionMessage); + } + await genWritable.next(null); + return; + } catch (e) { + await genWritable.throw(e); + !clientUtils.isClientClientError(e) && + logger.error(`${nodesListConnections.name}:${e}`); + return; + } + }; +} + +export default nodesListConnections; diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index aa5ae536e..a193e8374 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -1028,10 +1028,44 @@ class NodeConnectionManager { return establishedMap; } + @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public hasConnection(nodeId: NodeId): boolean { return this.connections.has(nodeId.toString() as NodeIdString); } + @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + public listConnections(): Array<{ + nodeId: NodeId; + address: { host: Host; port: Port; hostname: Hostname | undefined }; + usageCount: number; + timeout: number | undefined; + }> { + const results: Array<{ + nodeId: NodeId; + address: { host: Host; port: Port; hostname: Hostname | undefined }; + usageCount: number; + timeout: number | undefined; + }> = []; + for (const [ + nodeIdString, + connectionAndTimer, + ] of this.connections.entries()) { + const connection = connectionAndTimer.connection; + const nodeId = IdInternal.fromString(nodeIdString); + results.push({ + nodeId, + address: { + host: connection.host, + port: connection.port, + hostname: connection.hostname, + }, + usageCount: connectionAndTimer.usageCount, + timeout: connectionAndTimer.timer?.getTimeout(), + }); + } + return results; + } + protected hasBackoff(nodeId: NodeId): boolean { const backoff = this.nodesBackoffMap.get(nodeId.toString()); if (backoff == null) return false; diff --git a/src/proto/js/polykey/v1/client_service_grpc_pb.d.ts b/src/proto/js/polykey/v1/client_service_grpc_pb.d.ts index b230f8df4..212a15f7d 100644 --- a/src/proto/js/polykey/v1/client_service_grpc_pb.d.ts +++ b/src/proto/js/polykey/v1/client_service_grpc_pb.d.ts @@ -28,6 +28,7 @@ interface IClientServiceService extends grpc.ServiceDefinition; responseDeserialize: grpc.deserialize; } +interface IClientServiceService_INodesListConnections extends grpc.MethodDefinition { + path: "/polykey.v1.ClientService/NodesListConnections"; + requestStream: false; + responseStream: true; + requestSerialize: grpc.serialize; + requestDeserialize: grpc.deserialize; + responseSerialize: grpc.serialize; + responseDeserialize: grpc.deserialize; +} interface IClientServiceService_IKeysKeyPairRoot extends grpc.MethodDefinition { path: "/polykey.v1.ClientService/KeysKeyPairRoot"; requestStream: false; @@ -684,6 +694,7 @@ export interface IClientServiceServer extends grpc.UntypedServiceImplementation nodesClaim: grpc.handleUnaryCall; nodesFind: grpc.handleUnaryCall; nodesGetAll: grpc.handleUnaryCall; + nodesListConnections: grpc.handleServerStreamingCall; keysKeyPairRoot: grpc.handleUnaryCall; keysKeyPairReset: grpc.handleUnaryCall; keysKeyPairRenew: grpc.handleUnaryCall; @@ -770,6 +781,8 @@ export interface IClientServiceClient { nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; + nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, options?: Partial): grpc.ClientReadableStream; + nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata?: grpc.Metadata, options?: Partial): grpc.ClientReadableStream; keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; @@ -958,6 +971,8 @@ export class ClientServiceClient extends grpc.Client implements IClientServiceCl public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall; + public nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, options?: Partial): grpc.ClientReadableStream; + public nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata?: grpc.Metadata, options?: Partial): grpc.ClientReadableStream; public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall; diff --git a/src/proto/js/polykey/v1/client_service_grpc_pb.js b/src/proto/js/polykey/v1/client_service_grpc_pb.js index e08b6512c..bb209a08f 100644 --- a/src/proto/js/polykey/v1/client_service_grpc_pb.js +++ b/src/proto/js/polykey/v1/client_service_grpc_pb.js @@ -234,6 +234,17 @@ function deserialize_polykey_v1_nodes_NodeBuckets(buffer_arg) { return polykey_v1_nodes_nodes_pb.NodeBuckets.deserializeBinary(new Uint8Array(buffer_arg)); } +function serialize_polykey_v1_nodes_NodeConnection(arg) { + if (!(arg instanceof polykey_v1_nodes_nodes_pb.NodeConnection)) { + throw new Error('Expected argument of type polykey.v1.nodes.NodeConnection'); + } + return Buffer.from(arg.serializeBinary()); +} + +function deserialize_polykey_v1_nodes_NodeConnection(buffer_arg) { + return polykey_v1_nodes_nodes_pb.NodeConnection.deserializeBinary(new Uint8Array(buffer_arg)); +} + function serialize_polykey_v1_notifications_List(arg) { if (!(arg instanceof polykey_v1_notifications_notifications_pb.List)) { throw new Error('Expected argument of type polykey.v1.notifications.List'); @@ -590,6 +601,17 @@ nodesAdd: { responseSerialize: serialize_polykey_v1_nodes_NodeBuckets, responseDeserialize: deserialize_polykey_v1_nodes_NodeBuckets, }, + nodesListConnections: { + path: '/polykey.v1.ClientService/NodesListConnections', + requestStream: false, + responseStream: true, + requestType: polykey_v1_utils_utils_pb.EmptyMessage, + responseType: polykey_v1_nodes_nodes_pb.NodeConnection, + requestSerialize: serialize_polykey_v1_utils_EmptyMessage, + requestDeserialize: deserialize_polykey_v1_utils_EmptyMessage, + responseSerialize: serialize_polykey_v1_nodes_NodeConnection, + responseDeserialize: deserialize_polykey_v1_nodes_NodeConnection, + }, // Keys keysKeyPairRoot: { path: '/polykey.v1.ClientService/KeysKeyPairRoot', diff --git a/src/proto/js/polykey/v1/nodes/nodes_pb.d.ts b/src/proto/js/polykey/v1/nodes/nodes_pb.d.ts index c22abec29..e32af247a 100644 --- a/src/proto/js/polykey/v1/nodes/nodes_pb.d.ts +++ b/src/proto/js/polykey/v1/nodes/nodes_pb.d.ts @@ -6,7 +6,7 @@ import * as jspb from "google-protobuf"; -export class Node extends jspb.Message { +export class Node extends jspb.Message { getNodeId(): string; setNodeId(value: string): Node; @@ -26,7 +26,7 @@ export namespace Node { } } -export class Address extends jspb.Message { +export class Address extends jspb.Message { getHost(): string; setHost(value: string): Address; getPort(): number; @@ -49,7 +49,7 @@ export namespace Address { } } -export class NodeAddress extends jspb.Message { +export class NodeAddress extends jspb.Message { getNodeId(): string; setNodeId(value: string): NodeAddress; @@ -75,7 +75,7 @@ export namespace NodeAddress { } } -export class Claim extends jspb.Message { +export class Claim extends jspb.Message { getNodeId(): string; setNodeId(value: string): Claim; getForceInvite(): boolean; @@ -98,7 +98,7 @@ export namespace Claim { } } -export class NodeAdd extends jspb.Message { +export class NodeAdd extends jspb.Message { getNodeId(): string; setNodeId(value: string): NodeAdd; @@ -130,7 +130,7 @@ export namespace NodeAdd { } } -export class NodeBuckets extends jspb.Message { +export class NodeBuckets extends jspb.Message { getBucketsMap(): jspb.Map; clearBucketsMap(): void; @@ -152,7 +152,42 @@ export namespace NodeBuckets { } } -export class Connection extends jspb.Message { +export class NodeConnection extends jspb.Message { + getNodeId(): string; + setNodeId(value: string): NodeConnection; + getHost(): string; + setHost(value: string): NodeConnection; + getHostname(): string; + setHostname(value: string): NodeConnection; + getPort(): number; + setPort(value: number): NodeConnection; + getUsageCount(): number; + setUsageCount(value: number): NodeConnection; + getTimeout(): number; + setTimeout(value: number): NodeConnection; + + serializeBinary(): Uint8Array; + toObject(includeInstance?: boolean): NodeConnection.AsObject; + static toObject(includeInstance: boolean, msg: NodeConnection): NodeConnection.AsObject; + static extensions: {[key: number]: jspb.ExtensionFieldInfo}; + static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo}; + static serializeBinaryToWriter(message: NodeConnection, writer: jspb.BinaryWriter): void; + static deserializeBinary(bytes: Uint8Array): NodeConnection; + static deserializeBinaryFromReader(message: NodeConnection, reader: jspb.BinaryReader): NodeConnection; +} + +export namespace NodeConnection { + export type AsObject = { + nodeId: string, + host: string, + hostname: string, + port: number, + usageCount: number, + timeout: number, + } +} + +export class Connection extends jspb.Message { getAId(): string; setAId(value: string): Connection; getBId(): string; @@ -181,7 +216,7 @@ export namespace Connection { } } -export class Relay extends jspb.Message { +export class Relay extends jspb.Message { getSrcId(): string; setSrcId(value: string): Relay; getTargetId(): string; @@ -207,7 +242,7 @@ export namespace Relay { } } -export class NodeTable extends jspb.Message { +export class NodeTable extends jspb.Message { getNodeTableMap(): jspb.Map; clearNodeTableMap(): void; @@ -229,7 +264,7 @@ export namespace NodeTable { } } -export class ClaimType extends jspb.Message { +export class ClaimType extends jspb.Message { getClaimType(): string; setClaimType(value: string): ClaimType; @@ -249,7 +284,7 @@ export namespace ClaimType { } } -export class Claims extends jspb.Message { +export class Claims extends jspb.Message { clearClaimsList(): void; getClaimsList(): Array; setClaimsList(value: Array): Claims; @@ -271,7 +306,7 @@ export namespace Claims { } } -export class ChainData extends jspb.Message { +export class ChainData extends jspb.Message { getChainDataMap(): jspb.Map; clearChainDataMap(): void; @@ -293,7 +328,7 @@ export namespace ChainData { } } -export class AgentClaim extends jspb.Message { +export class AgentClaim extends jspb.Message { getPayload(): string; setPayload(value: string): AgentClaim; clearSignaturesList(): void; @@ -318,7 +353,7 @@ export namespace AgentClaim { } } -export class Signature extends jspb.Message { +export class Signature extends jspb.Message { getSignature(): string; setSignature(value: string): Signature; getProtected(): string; @@ -341,7 +376,7 @@ export namespace Signature { } } -export class ClaimIntermediary extends jspb.Message { +export class ClaimIntermediary extends jspb.Message { getPayload(): string; setPayload(value: string): ClaimIntermediary; @@ -367,7 +402,7 @@ export namespace ClaimIntermediary { } } -export class CrossSign extends jspb.Message { +export class CrossSign extends jspb.Message { hasSinglySignedClaim(): boolean; clearSinglySignedClaim(): void; diff --git a/src/proto/js/polykey/v1/nodes/nodes_pb.js b/src/proto/js/polykey/v1/nodes/nodes_pb.js index 659d9bc87..448183d8e 100644 --- a/src/proto/js/polykey/v1/nodes/nodes_pb.js +++ b/src/proto/js/polykey/v1/nodes/nodes_pb.js @@ -28,6 +28,7 @@ goog.exportSymbol('proto.polykey.v1.nodes.Node', null, global); goog.exportSymbol('proto.polykey.v1.nodes.NodeAdd', null, global); goog.exportSymbol('proto.polykey.v1.nodes.NodeAddress', null, global); goog.exportSymbol('proto.polykey.v1.nodes.NodeBuckets', null, global); +goog.exportSymbol('proto.polykey.v1.nodes.NodeConnection', null, global); goog.exportSymbol('proto.polykey.v1.nodes.NodeTable', null, global); goog.exportSymbol('proto.polykey.v1.nodes.Relay', null, global); goog.exportSymbol('proto.polykey.v1.nodes.Signature', null, global); @@ -157,6 +158,27 @@ if (goog.DEBUG && !COMPILED) { */ proto.polykey.v1.nodes.NodeBuckets.displayName = 'proto.polykey.v1.nodes.NodeBuckets'; } +/** + * Generated by JsPbCodeGenerator. + * @param {Array=} opt_data Optional initial data array, typically from a + * server response, or constructed directly in Javascript. The array is used + * in place and becomes part of the constructed object. It is not cloned. + * If no data is provided, the constructed object will be empty, but still + * valid. + * @extends {jspb.Message} + * @constructor + */ +proto.polykey.v1.nodes.NodeConnection = function(opt_data) { + jspb.Message.initialize(this, opt_data, 0, -1, null, null); +}; +goog.inherits(proto.polykey.v1.nodes.NodeConnection, jspb.Message); +if (goog.DEBUG && !COMPILED) { + /** + * @public + * @override + */ + proto.polykey.v1.nodes.NodeConnection.displayName = 'proto.polykey.v1.nodes.NodeConnection'; +} /** * Generated by JsPbCodeGenerator. * @param {Array=} opt_data Optional initial data array, typically from a @@ -1375,6 +1397,286 @@ proto.polykey.v1.nodes.NodeBuckets.prototype.clearBucketsMap = function() { +if (jspb.Message.GENERATE_TO_OBJECT) { +/** + * Creates an object representation of this proto. + * Field names that are reserved in JavaScript and will be renamed to pb_name. + * Optional fields that are not set will be set to undefined. + * To access a reserved field use, foo.pb_, eg, foo.pb_default. + * For the list of reserved names please see: + * net/proto2/compiler/js/internal/generator.cc#kKeyword. + * @param {boolean=} opt_includeInstance Deprecated. whether to include the + * JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @return {!Object} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.toObject = function(opt_includeInstance) { + return proto.polykey.v1.nodes.NodeConnection.toObject(opt_includeInstance, this); +}; + + +/** + * Static version of the {@see toObject} method. + * @param {boolean|undefined} includeInstance Deprecated. Whether to include + * the JSPB instance for transitional soy proto support: + * http://goto/soy-param-migration + * @param {!proto.polykey.v1.nodes.NodeConnection} msg The msg instance to transform. + * @return {!Object} + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.polykey.v1.nodes.NodeConnection.toObject = function(includeInstance, msg) { + var f, obj = { + nodeId: jspb.Message.getFieldWithDefault(msg, 1, ""), + host: jspb.Message.getFieldWithDefault(msg, 2, ""), + hostname: jspb.Message.getFieldWithDefault(msg, 3, ""), + port: jspb.Message.getFieldWithDefault(msg, 4, 0), + usageCount: jspb.Message.getFieldWithDefault(msg, 5, 0), + timeout: jspb.Message.getFieldWithDefault(msg, 6, 0) + }; + + if (includeInstance) { + obj.$jspbMessageInstance = msg; + } + return obj; +}; +} + + +/** + * Deserializes binary data (in protobuf wire format). + * @param {jspb.ByteSource} bytes The bytes to deserialize. + * @return {!proto.polykey.v1.nodes.NodeConnection} + */ +proto.polykey.v1.nodes.NodeConnection.deserializeBinary = function(bytes) { + var reader = new jspb.BinaryReader(bytes); + var msg = new proto.polykey.v1.nodes.NodeConnection; + return proto.polykey.v1.nodes.NodeConnection.deserializeBinaryFromReader(msg, reader); +}; + + +/** + * Deserializes binary data (in protobuf wire format) from the + * given reader into the given message object. + * @param {!proto.polykey.v1.nodes.NodeConnection} msg The message object to deserialize into. + * @param {!jspb.BinaryReader} reader The BinaryReader to use. + * @return {!proto.polykey.v1.nodes.NodeConnection} + */ +proto.polykey.v1.nodes.NodeConnection.deserializeBinaryFromReader = function(msg, reader) { + while (reader.nextField()) { + if (reader.isEndGroup()) { + break; + } + var field = reader.getFieldNumber(); + switch (field) { + case 1: + var value = /** @type {string} */ (reader.readString()); + msg.setNodeId(value); + break; + case 2: + var value = /** @type {string} */ (reader.readString()); + msg.setHost(value); + break; + case 3: + var value = /** @type {string} */ (reader.readString()); + msg.setHostname(value); + break; + case 4: + var value = /** @type {number} */ (reader.readInt32()); + msg.setPort(value); + break; + case 5: + var value = /** @type {number} */ (reader.readInt32()); + msg.setUsageCount(value); + break; + case 6: + var value = /** @type {number} */ (reader.readInt32()); + msg.setTimeout(value); + break; + default: + reader.skipField(); + break; + } + } + return msg; +}; + + +/** + * Serializes the message to binary data (in protobuf wire format). + * @return {!Uint8Array} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.serializeBinary = function() { + var writer = new jspb.BinaryWriter(); + proto.polykey.v1.nodes.NodeConnection.serializeBinaryToWriter(this, writer); + return writer.getResultBuffer(); +}; + + +/** + * Serializes the given message to binary data (in protobuf wire + * format), writing to the given BinaryWriter. + * @param {!proto.polykey.v1.nodes.NodeConnection} message + * @param {!jspb.BinaryWriter} writer + * @suppress {unusedLocalVariables} f is only used for nested messages + */ +proto.polykey.v1.nodes.NodeConnection.serializeBinaryToWriter = function(message, writer) { + var f = undefined; + f = message.getNodeId(); + if (f.length > 0) { + writer.writeString( + 1, + f + ); + } + f = message.getHost(); + if (f.length > 0) { + writer.writeString( + 2, + f + ); + } + f = message.getHostname(); + if (f.length > 0) { + writer.writeString( + 3, + f + ); + } + f = message.getPort(); + if (f !== 0) { + writer.writeInt32( + 4, + f + ); + } + f = message.getUsageCount(); + if (f !== 0) { + writer.writeInt32( + 5, + f + ); + } + f = message.getTimeout(); + if (f !== 0) { + writer.writeInt32( + 6, + f + ); + } +}; + + +/** + * optional string node_id = 1; + * @return {string} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getNodeId = function() { + return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 1, "")); +}; + + +/** + * @param {string} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setNodeId = function(value) { + return jspb.Message.setProto3StringField(this, 1, value); +}; + + +/** + * optional string host = 2; + * @return {string} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getHost = function() { + return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 2, "")); +}; + + +/** + * @param {string} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setHost = function(value) { + return jspb.Message.setProto3StringField(this, 2, value); +}; + + +/** + * optional string hostname = 3; + * @return {string} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getHostname = function() { + return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 3, "")); +}; + + +/** + * @param {string} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setHostname = function(value) { + return jspb.Message.setProto3StringField(this, 3, value); +}; + + +/** + * optional int32 port = 4; + * @return {number} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getPort = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 4, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setPort = function(value) { + return jspb.Message.setProto3IntField(this, 4, value); +}; + + +/** + * optional int32 usage_count = 5; + * @return {number} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getUsageCount = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 5, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setUsageCount = function(value) { + return jspb.Message.setProto3IntField(this, 5, value); +}; + + +/** + * optional int32 timeout = 6; + * @return {number} + */ +proto.polykey.v1.nodes.NodeConnection.prototype.getTimeout = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 6, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.polykey.v1.nodes.NodeConnection} returns this + */ +proto.polykey.v1.nodes.NodeConnection.prototype.setTimeout = function(value) { + return jspb.Message.setProto3IntField(this, 6, value); +}; + + + + + if (jspb.Message.GENERATE_TO_OBJECT) { /** * Creates an object representation of this proto. diff --git a/src/proto/schemas/polykey/v1/client_service.proto b/src/proto/schemas/polykey/v1/client_service.proto index 9c90e0286..4f78a1d28 100644 --- a/src/proto/schemas/polykey/v1/client_service.proto +++ b/src/proto/schemas/polykey/v1/client_service.proto @@ -27,6 +27,7 @@ service ClientService { rpc NodesClaim(polykey.v1.nodes.Claim) returns (polykey.v1.utils.StatusMessage); rpc NodesFind(polykey.v1.nodes.Node) returns (polykey.v1.nodes.NodeAddress); rpc NodesGetAll(polykey.v1.utils.EmptyMessage) returns (polykey.v1.nodes.NodeBuckets); + rpc NodesListConnections(polykey.v1.utils.EmptyMessage) returns (stream polykey.v1.nodes.NodeConnection); // Keys rpc KeysKeyPairRoot (polykey.v1.utils.EmptyMessage) returns (polykey.v1.keys.KeyPair); diff --git a/src/proto/schemas/polykey/v1/nodes/nodes.proto b/src/proto/schemas/polykey/v1/nodes/nodes.proto index 3b2f18fa6..52256b140 100644 --- a/src/proto/schemas/polykey/v1/nodes/nodes.proto +++ b/src/proto/schemas/polykey/v1/nodes/nodes.proto @@ -37,6 +37,15 @@ message NodeBuckets { map buckets = 1; } +message NodeConnection { + string node_id = 1; + string host = 2; + string hostname = 3; + int32 port = 4; + int32 usage_count = 5; + int32 timeout = 6; +} + // Agent specific. message Connection { diff --git a/tests/nodes/NodeConnectionManager.lifecycle.test.ts b/tests/nodes/NodeConnectionManager.lifecycle.test.ts index 18ea53622..6456a731d 100644 --- a/tests/nodes/NodeConnectionManager.lifecycle.test.ts +++ b/tests/nodes/NodeConnectionManager.lifecycle.test.ts @@ -286,6 +286,32 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { await nodeConnectionManager?.stop(); } }); + test('should list active connections', async () => { + // NodeConnectionManager under test + let nodeConnectionManager: NodeConnectionManager | undefined; + try { + nodeConnectionManager = new NodeConnectionManager({ + keyManager, + nodeGraph, + proxy, + taskManager, + logger: nodeConnectionManagerLogger, + }); + await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); + await taskManager.startProcessing(); + await nodeConnectionManager.withConnF(remoteNodeId1, async () => { + // Do nothing + expect(nodeConnectionManager?.listConnections()).toHaveLength(1); + }); + await nodeConnectionManager.withConnF(remoteNodeId2, async () => { + // Do nothing + expect(nodeConnectionManager?.listConnections()).toHaveLength(2); + }); + expect(nodeConnectionManager?.listConnections()).toHaveLength(2); + } finally { + await nodeConnectionManager?.stop(); + } + }); test('withConnG should create connection', async () => { // NodeConnectionManager under test let nodeConnectionManager: NodeConnectionManager | undefined;