Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: JS clients can provide custom grpc transports (#6476) #6479

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.web.client.api;

import elemental2.core.Function;
import io.deephaven.web.client.api.grpc.GrpcTransportFactory;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsType;
Expand All @@ -29,20 +29,23 @@ public class ConnectOptions {

/**
* Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use
* of {@code fetch}.
* of {@code fetch}. Ignored if {@link #transportFactory} is set.
* <p>
* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets.
*/
@JsNullable
public Boolean useWebsockets;

// TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch
// /**
// * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a
// * polyfill rather than add a new global.
// */
// @JsNullable
// public Function fetch;
/**
* The transport factory to use for creating gRPC streams. If specified, the JS API will ignore
* {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use.
* <p>
* Defaults to null, indicating that the JS API should determine the appropriate transport to use. If
* {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with
* https, it will use fetch, otherwise it will use websockets.
*/
@JsNullable
public GrpcTransportFactory transportFactory;

public ConnectOptions() {

Expand All @@ -65,5 +68,8 @@ public ConnectOptions(Object connectOptions) {
// if (map.has("fetch")) {
// fetch = map.getAsAny("fetch").uncheckedCast();
// }
if (map.has("transportFactory")) {
transportFactory = map.getAsAny("transportFactory").uncheckedCast();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import elemental2.core.JsArray;
import elemental2.core.JsSet;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc;
import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse;
import io.deephaven.web.client.api.event.HasEventHandling;
import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport;
import io.deephaven.web.client.ide.IdeSession;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
Expand Down Expand Up @@ -246,12 +244,8 @@ public void disconnected() {

public abstract void notifyServerShutdown(TerminationNotificationResponse success);

public boolean useWebsockets() {
Boolean useWebsockets = getOptions().useWebsockets;
if (useWebsockets == null) {
useWebsockets = getServerUrl().startsWith("http:");
}
return useWebsockets;
public boolean supportsClientStreaming() {
return getOptions().transportFactory.getSupportsClientStreaming();
}

public <T> T createClient(BiFunction<String, Object, T> constructor) {
Expand All @@ -261,12 +255,7 @@ public <T> T createClient(BiFunction<String, Object, T> constructor) {
public RpcOptions makeRpcOptions() {
RpcOptions options = RpcOptions.create();
options.setDebug(getOptions().debug);
if (useWebsockets()) {
// Replace with our custom websocket impl, with fallback to the built-in one
options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> {
Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke());
}));
}
options.setTransport(getOptions().transportFactory.adapt());
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ public BrowserHeaders metadata() {
}

public <ReqT, RespT> BiDiStream.Factory<ReqT, RespT> streamFactory() {
return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt);
return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt);
}

public Promise<JsTable> newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public interface NextStreamMessageFactory<Req> {
void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer<Object, Object> callback);
}
public static class Factory<ReqT, RespT> {
private final boolean useWebsockets;
private final boolean supportsClientStreaming;
private final Supplier<BrowserHeaders> headers;
private final IntSupplier nextIntTicket;

public Factory(boolean useWebsockets, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.useWebsockets = useWebsockets;
public Factory(boolean supportsClientStreaming, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.supportsClientStreaming = supportsClientStreaming;
this.headers = headers;
this.nextIntTicket = nextIntTicket;
}
Expand All @@ -51,8 +51,8 @@ public BiDiStream<ReqT, RespT> create(
OpenStreamFactory<ReqT> openEmulatedStream,
NextStreamMessageFactory<ReqT> nextEmulatedStream,
ReqT emptyReq) {
if (useWebsockets) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
if (supportsClientStreaming) {
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
Expand All @@ -73,7 +73,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
IntSupplier nextIntTicket,
boolean useWebsocket) {
if (useWebsocket) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
Expand All @@ -84,7 +84,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
}
}

public static <Req, Resp> BiDiStream<Req, Resp> websocket(Object bidirectionalStream) {
public static <Req, Resp> BiDiStream<Req, Resp> bidi(Object bidirectionalStream) {
return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* gRPC transport implementation.
*
*/
@JsType(namespace = "dh.grpc")
@TsInterface
public interface GrpcTransport {
/**
* Starts the stream, sending metadata to the server.
*
* @param metadata the headers to send the server when opening the connection
*/
void start(JsPropertyMap<HeaderValueUnion> metadata);

/**
* Sends a message to the server.
*
* @param msgBytes bytes to send to the server
*/
void sendMessage(Uint8Array msgBytes);

/**
* "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still
* open to receiving messages.
*/
void finishSend();

/**
* End the stream, both notifying the server that no more messages will be sent nor received, and preventing the
* client from receiving any more events.
*/
void cancel();

/**
* Helper to transform ts implementations to our own api.
*/
@JsIgnore
static GrpcTransport from(Transport tsTransport) {
return new GrpcTransport() {
@Override
public void start(JsPropertyMap<HeaderValueUnion> metadata) {
tsTransport.start(new BrowserHeaders(metadata));
}

@Override
public void sendMessage(Uint8Array msgBytes) {
tsTransport.sendMessage(msgBytes);
}

@Override
public void finishSend() {
tsTransport.finishSend();
}

@Override
public void cancel() {
tsTransport.cancel();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory;
import jsinterop.annotations.JsOverlay;
import jsinterop.annotations.JsProperty;
import jsinterop.annotations.JsType;
import jsinterop.base.Js;

/**
* Factory for creating gRPC transports.
*/
@TsInterface
@JsType(namespace = "dh.grpc", isNative = true)
public interface GrpcTransportFactory {
/**
* Create a new transport instance.
*
* @param options options for creating the transport
* @return a transport instance to use for gRPC communication
*/
GrpcTransport create(GrpcTransportOptions options);

/**
* Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it
* more than once before {@link GrpcTransport#finishSend()} should be called.
*
* @return true to signal that the implementation can stream multiple messages, false otherwise indicating that
* Open/Next gRPC calls should be used
*/
@JsProperty
boolean getSupportsClientStreaming();

/**
* Adapt this factory to the transport factory used by the gRPC-web library.
*/
@JsOverlay
default TransportFactory adapt() {
return options -> {
GrpcTransport impl = create(GrpcTransportOptions.from(options));
return new Transport() {
@Override
public void cancel() {
impl.cancel();
}

@Override
public void finishSend() {
impl.finishSend();
}

@Override
public void sendMessage(Uint8Array msgBytes) {
impl.sendMessage(msgBytes);
}

@Override
public void start(BrowserHeaders metadata) {
impl.start(Js.cast(metadata.headersMap));
}
};
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.JsError;
import elemental2.core.Uint8Array;
import elemental2.dom.URL;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions;
import jsinterop.annotations.JsFunction;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsOptional;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* Options for creating a gRPC stream transport instance.
*/
@TsInterface
@JsType(namespace = "dh.grpc")
public class GrpcTransportOptions {
@JsFunction
@FunctionalInterface
public interface OnHeadersCallback {
void onHeaders(JsPropertyMap<HeaderValueUnion> headers, int status);
}

@JsFunction
@FunctionalInterface
public interface OnChunkCallback {
void onChunk(Uint8Array chunk);
}

@JsFunction
@FunctionalInterface
public interface OnEndCallback {
void onEnd(@JsOptional @JsNullable JsError error);
}

/**
* The gRPC method URL.
*/
public URL url;

/**
* True to enable debug logging for this stream.
*/
public boolean debug;

/**
* Callback for when headers and status are received. The headers are a map of header names to values, and the
* status is the HTTP status code. If the connection could not be made, the status should be 0.
*/
public OnHeadersCallback onHeaders;

/**
* Callback for when a chunk of data is received.
*/
public OnChunkCallback onChunk;

/**
* Callback for when the stream ends, with an error instance if it can be provided. Note that the present
* implementation does not consume errors, even if provided.
*/
public OnEndCallback onEnd;

/**
* Internal copy of options, to be used for fallback.
*/
@JsIgnore
public TransportOptions originalOptions;

/**
* Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance.
*/
@JsIgnore
public static GrpcTransportOptions from(TransportOptions options) {
GrpcTransportOptions impl = new GrpcTransportOptions();
impl.url = new URL(options.getUrl());
impl.debug = options.isDebug();
impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status);
impl.onChunk = p0 -> {
// "false" because the underlying implementation doesn't rely on this anyway.
options.getOnChunk().onInvoke(p0, false);
};
impl.onEnd = options.getOnEnd()::onInvoke;
return impl;
}
}
Loading