Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Node http2 gRPC transport #181

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/controllers/ExtensionController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,20 @@ import {
type AuthenticatedClient as DheAuthenticatedClient,
type UnauthenticatedClient as DheUnauthenticatedClient,
} from '@deephaven-enterprise/auth-nodejs';
import { NodeHttp2gRPCTransport } from '../dh/NodeHttp2gRPCTransport';
import type { GrpcTransportFactory } from '../dh/grpc';

const logger = new Logger('ExtensionController');

declare module '@deephaven/jsapi-types' {
export namespace dh {
export interface ConnectOptions {
debug?: boolean;
transportFactory?: GrpcTransportFactory;
}
}
}

export class ExtensionController implements Disposable {
constructor(context: vscode.ExtensionContext, configService: IConfigService) {
this._context = context;
Expand Down Expand Up @@ -333,9 +344,12 @@ export class ExtensionController implements Disposable {
assertDefined(this._coreJsApiCache, 'coreJsApiCache');
const dhc = await this._coreJsApiCache.get(url);

const client = new dhc.CoreClient(
url.toString()
) as CoreUnauthenticatedClient;
const client = new dhc.CoreClient(url.toString(), {
debug: true,
// TODO: This should be optional, but types aren't happy yet
headers: {},
transportFactory: NodeHttp2gRPCTransport.factory,
}) as CoreUnauthenticatedClient;

// Attach a dispose method so that client caches can dispose of the client
return Object.assign(client, {
Expand Down
139 changes: 139 additions & 0 deletions src/dh/NodeHttp2gRPCTransport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import http2 from 'node:http2';
import type {
GrpcTransport,
GrpcTransportFactory,
GrpcTransportOptions,
} from './grpc';
import { assertDefined } from '../util';

export class NodeHttp2gRPCTransport implements GrpcTransport {
static _sessionMap: Map<string, http2.ClientHttp2Session> = new Map();

/**
* TODO: Cleanup requests similar to https://github.com/deephaven/deephaven-core/blob/c05b35957e466fded4da61154ba106cfc3198bc5/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java#L129
* Create a Transport instance.
* @param options Transport options.
* @returns Transport instance.
*/
static readonly factory: GrpcTransportFactory = {
create: options => {
const { origin } = new URL(options.url);

if (!NodeHttp2gRPCTransport._sessionMap.has(origin)) {
const session = http2.connect(origin);
session.on('error', err => {
console.error('Session error', err);

Check warning on line 25 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement
});
NodeHttp2gRPCTransport._sessionMap.set(origin, session);
}

const session = NodeHttp2gRPCTransport._sessionMap.get(origin)!;

return new NodeHttp2gRPCTransport(options, session);
},

get supportsClientStreaming(): boolean {
return false;
},
};

/**
* Private constructor to restrict instantiation to static factory method.
* @param options Transport options.
* @param session node:http2 session.
*/
private constructor(
options: GrpcTransportOptions,
session: http2.ClientHttp2Session
) {
this._options = options;
this._session = session;
}

private readonly _options: GrpcTransportOptions;
private readonly _session: http2.ClientHttp2Session;
private _request: http2.ClientHttp2Stream | null = null;

_createRequest = (
headers: Record<string, string> | null
): http2.ClientHttp2Stream => {
const url = new URL(this._options.url);

const req = this._session.request({
...headers,
// may need to set the :authority header at some point
':method': 'POST',

Check warning on line 65 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Object Literal Property name `:method` must match one of the following formats: camelCase
':path': url.pathname,

Check warning on line 66 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Object Literal Property name `:path` must match one of the following formats: camelCase
});

console.log('[NodeHttp2Transport] _createRequest', url.pathname);

Check warning on line 69 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement

req.on('response', (headers, _flags) => {
const headersRecord: Record<string, string | string[]> = {};

// strip any undefined headers or keys that start with `:`
for (const name in headers) {
if (headers[name] != null && !name.startsWith(':')) {
headersRecord[name] = headers[name];
}
}

this._options.onHeaders(headersRecord, Number(headers[':status']));
});

req.on('data', (chunk: Buffer) => {
this._options.onChunk(chunk);
});
req.on('end', () => {
this._options.onEnd();
});
req.on('error', err => {
this._options.onEnd(err);
});

return req;
};

start(metadata: { [key: string]: string | Array<string> }): void {
console.log('[NodeHttp2Transport] start', metadata.headersMap);

Check warning on line 98 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement

if (this._request != null) {
throw new Error('start called more than once');
}

const headers: Record<string, string> = {};
Object.entries(metadata).forEach(([key, value]) => {
headers[key] = typeof value === 'string' ? value : value.join(', ');
});

this._request = this._createRequest(headers);
}

sendMessage(msgBytes: Uint8Array): void {
console.log('[NodeHttp2Transport] sendMessage', msgBytes);

Check warning on line 113 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement
assertDefined(this._request, '_request');
this._request.write(msgBytes);
}

finishSend(): void {
console.log('[NodeHttp2Transport] finishSend');

Check warning on line 119 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement
assertDefined(this._request, '_request');
this._request.end();
}

cancel(): void {
console.log('[NodeHttp2Transport] cancel');

Check warning on line 125 in src/dh/NodeHttp2gRPCTransport.ts

View workflow job for this annotation

GitHub Actions / call-unit / unit

Unexpected console statement
assertDefined(this._request, '_request');
this._request.close();
}

/**
* Cleanup.
*/
static dispose(): void {
for (const session of NodeHttp2gRPCTransport._sessionMap.values()) {
session.close();
}
NodeHttp2gRPCTransport._sessionMap.clear();
}
}
73 changes: 73 additions & 0 deletions src/dh/grpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Factory for creating gRPC transports.
*/
export interface GrpcTransportFactory {
/**
* Create a new transport instance.
* @param options - options for creating the transport
* @return a transport instance to use for gRPC communication
*/
create(options: GrpcTransportOptions): GrpcTransport;
/**
* Return true to signal that created transports may have {@link GrpcTransport.sendMessage} called on it
* more than once before {@link GrpcTransport.finishSend} should be called.
* @return true to signal that the implementation can stream multiple messages, false otherwise indicating that
* Open/Next gRPC calls should be used
*/
get supportsClientStreaming(): boolean;
}
/**
* Options for creating a gRPC stream transport instance.
*/
export interface GrpcTransportOptions {
/**
* The gRPC method URL.
*/
url: URL;
/**
* True to enable debug logging for this stream.
*/
debug: boolean;
/**
* Callback for when headers and status are received. The headers are a map of header names to values, and the
* status is the HTTP status code. If the connection could not be made, the status should be 0.
*/
onHeaders: (
headers: { [key: string]: string | Array<string> },
status: number
) => void;
/**
* Callback for when a chunk of data is received.
*/
onChunk: (chunk: Uint8Array) => void;
/**
* Callback for when the stream ends, with an error instance if it can be provided. Note that the present
* implementation does not consume errors, even if provided.
*/
onEnd: (error?: Error | undefined | null) => void;
}
/**
* gRPC transport implementation.
*/
export interface GrpcTransport {
/**
* Starts the stream, sending metadata to the server.
* @param metadata - the headers to send the server when opening the connection
*/
start(metadata: { [key: string]: string | Array<string> }): void;
/**
* Sends a message to the server.
* @param msgBytes - bytes to send to the server
*/
sendMessage(msgBytes: Uint8Array): void;
/**
* "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still
* open to receiving messages.
*/
finishSend(): void;
/**
* End the stream, both notifying the server that no more messages will be sent nor received, and preventing the
* client from receiving any more events.
*/
cancel(): void;
}
2 changes: 2 additions & 0 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import * as vscode from 'vscode';
import { ExtensionController } from './controllers';
import { ConfigService } from './services';

process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';

export function activate(context: vscode.ExtensionContext): void {
const controller = new ExtensionController(context, ConfigService);

Expand Down
Loading