diff --git a/src/controllers/ExtensionController.ts b/src/controllers/ExtensionController.ts index 4c2ff729..656bd813 100644 --- a/src/controllers/ExtensionController.ts +++ b/src/controllers/ExtensionController.ts @@ -78,6 +78,7 @@ import { type AuthenticatedClient as DheAuthenticatedClient, type UnauthenticatedClient as DheUnauthenticatedClient, } from '@deephaven-enterprise/auth-nodejs'; +import { NodeHttp2gRPCTransport } from '../dh/NodeHttp2gRPCTransport'; const logger = new Logger('ExtensionController'); @@ -333,9 +334,10 @@ export class ExtensionController implements Disposable { assertDefined(this._coreJsApiCache, 'coreJsApiCache'); const dhc = await this._coreJsApiCache.get(url); - const client = new dhc.CoreClient( - url.toString() - ) as CoreUnauthenticatedClient; + const client = new dhc.CoreClient(url.toString(), { + debug: true, + transportFactory: NodeHttp2gRPCTransport.factory, + }) as CoreUnauthenticatedClient; // Attach a dispose method so that client caches can dispose of the client return Object.assign(client, { diff --git a/src/dh/NodeHttp2gRPCTransport.ts b/src/dh/NodeHttp2gRPCTransport.ts new file mode 100644 index 00000000..4d3fb691 --- /dev/null +++ b/src/dh/NodeHttp2gRPCTransport.ts @@ -0,0 +1,139 @@ +import http2 from 'node:http2'; +import type { dh as DhcType } from '@deephaven/jsapi-types'; +import { assertDefined } from '../util'; + +type GrpcTransport = DhcType.grpc.GrpcTransport; +type GrpcTransportFactory = DhcType.grpc.GrpcTransportFactory; +type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions; + +export class NodeHttp2gRPCTransport implements GrpcTransport { + static _sessionMap: Map = new Map(); + + /** + * TODO: Cleanup requests similar to https://github.com/deephaven/deephaven-core/blob/c05b35957e466fded4da61154ba106cfc3198bc5/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java#L129 + * Create a Transport instance. + * @param options Transport options. + * @returns Transport instance. + */ + static readonly factory: GrpcTransportFactory = { + create: options => { + const { origin } = new URL(options.url); + + if (!NodeHttp2gRPCTransport._sessionMap.has(origin)) { + const session = http2.connect(origin); + session.on('error', err => { + console.error('Session error', err); + }); + NodeHttp2gRPCTransport._sessionMap.set(origin, session); + } + + const session = NodeHttp2gRPCTransport._sessionMap.get(origin)!; + + return new NodeHttp2gRPCTransport(options, session); + }, + + get supportsClientStreaming(): boolean { + return false; + }, + }; + + /** + * Private constructor to restrict instantiation to static factory method. + * @param options Transport options. + * @param session node:http2 session. + */ + private constructor( + options: GrpcTransportOptions, + session: http2.ClientHttp2Session + ) { + this._options = options; + this._session = session; + } + + private readonly _options: GrpcTransportOptions; + private readonly _session: http2.ClientHttp2Session; + private _request: http2.ClientHttp2Stream | null = null; + + _createRequest = ( + headers: Record | null + ): http2.ClientHttp2Stream => { + const url = new URL(this._options.url); + + const req = this._session.request({ + ...headers, + // may need to set the :authority header at some point + ':method': 'POST', + ':path': url.pathname, + }); + + console.log('[NodeHttp2Transport] _createRequest', url.pathname); + + req.on('response', (headers, _flags) => { + const headersRecord: Record = {}; + + // strip any undefined headers or keys that start with `:` + for (const name in headers) { + if (headers[name] != null && !name.startsWith(':')) { + headersRecord[name] = headers[name]; + } + } + + this._options.onHeaders(headersRecord, Number(headers[':status'])); + }); + + req.on('data', (chunk: Buffer) => { + this._options.onChunk(chunk); + }); + req.on('end', () => { + this._options.onEnd(); + }); + req.on('error', err => { + this._options.onEnd(err); + }); + + return req; + }; + + start(metadata: { [key: string]: string | Array }): void { + console.log('[NodeHttp2Transport] start', metadata.headersMap); + + if (this._request != null) { + throw new Error('start called more than once'); + } + + const headers: Record = {}; + Object.entries(metadata).forEach(([key, value]) => { + headers[key] = typeof value === 'string' ? value : value.join(', '); + }); + + this._request = this._createRequest(headers); + } + + sendMessage(msgBytes: Uint8Array): void { + console.log('[NodeHttp2Transport] sendMessage', msgBytes); + assertDefined(this._request, '_request'); + this._request.write(msgBytes); + } + + finishSend(): void { + console.log('[NodeHttp2Transport] finishSend'); + assertDefined(this._request, '_request'); + this._request.end(); + } + + cancel(): void { + console.log('[NodeHttp2Transport] cancel'); + assertDefined(this._request, '_request'); + this._request.close(); + } + + /** + * Cleanup. + */ + static dispose(): void { + for (const session of NodeHttp2gRPCTransport._sessionMap.values()) { + session.close(); + } + NodeHttp2gRPCTransport._sessionMap.clear(); + } +} diff --git a/src/extension.ts b/src/extension.ts index 14aef31b..3f40f132 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -2,6 +2,8 @@ import * as vscode from 'vscode'; import { ExtensionController } from './controllers'; import { ConfigService } from './services'; +process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; + export function activate(context: vscode.ExtensionContext): void { const controller = new ExtensionController(context, ConfigService);