diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java index 8bf5c5d10bc..16a3711974d 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/ConnectOptions.java @@ -3,7 +3,7 @@ // package io.deephaven.web.client.api; -import elemental2.core.Function; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsNullable; import jsinterop.annotations.JsType; @@ -29,20 +29,23 @@ public class ConnectOptions { /** * Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use - * of {@code fetch}. + * of {@code fetch}. Ignored if {@link #transportFactory} is set. *

* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets. */ @JsNullable public Boolean useWebsockets; - // TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch - // /** - // * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a - // * polyfill rather than add a new global. - // */ - // @JsNullable - // public Function fetch; + /** + * The transport factory to use for creating gRPC streams. If specified, the JS API will ignore + * {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use. + *

+ * Defaults to null, indicating that the JS API should determine the appropriate transport to use. If + * {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with + * https, it will use fetch, otherwise it will use websockets. + */ + @JsNullable + public GrpcTransportFactory transportFactory; public ConnectOptions() { @@ -65,5 +68,8 @@ public ConnectOptions(Object connectOptions) { // if (map.has("fetch")) { // fetch = map.getAsAny("fetch").uncheckedCast(); // } + if (map.has("transportFactory")) { + transportFactory = map.getAsAny("transportFactory").uncheckedCast(); + } } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index 1394069e0b7..2a73b357a5e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -7,11 +7,9 @@ import elemental2.core.JsArray; import elemental2.core.JsSet; import elemental2.promise.Promise; -import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.web.client.api.event.HasEventHandling; -import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.client.ide.IdeSession; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket; @@ -246,12 +244,8 @@ public void disconnected() { public abstract void notifyServerShutdown(TerminationNotificationResponse success); - public boolean useWebsockets() { - Boolean useWebsockets = getOptions().useWebsockets; - if (useWebsockets == null) { - useWebsockets = getServerUrl().startsWith("http:"); - } - return useWebsockets; + public boolean supportsClientStreaming() { + return getOptions().transportFactory.getSupportsClientStreaming(); } public T createClient(BiFunction constructor) { @@ -261,12 +255,7 @@ public T createClient(BiFunction constructor) { public RpcOptions makeRpcOptions() { RpcOptions options = RpcOptions.create(); options.setDebug(getOptions().debug); - if (useWebsockets()) { - // Replace with our custom websocket impl, with fallback to the built-in one - options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> { - Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke()); - })); - } + options.setTransport(getOptions().transportFactory.adapt()); return options; } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index f8212bb05ac..4ea02498a51 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -986,7 +986,7 @@ public BrowserHeaders metadata() { } public BiDiStream.Factory streamFactory() { - return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt); + return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt); } public Promise newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone, diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java index 5febd0a9fa4..4fc65739780 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/BiDiStream.java @@ -36,12 +36,12 @@ public interface NextStreamMessageFactory { void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer callback); } public static class Factory { - private final boolean useWebsockets; + private final boolean supportsClientStreaming; private final Supplier headers; private final IntSupplier nextIntTicket; - public Factory(boolean useWebsockets, Supplier headers, IntSupplier nextIntTicket) { - this.useWebsockets = useWebsockets; + public Factory(boolean supportsClientStreaming, Supplier headers, IntSupplier nextIntTicket) { + this.supportsClientStreaming = supportsClientStreaming; this.headers = headers; this.nextIntTicket = nextIntTicket; } @@ -51,8 +51,8 @@ public BiDiStream create( OpenStreamFactory openEmulatedStream, NextStreamMessageFactory nextEmulatedStream, ReqT emptyReq) { - if (useWebsockets) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + if (supportsClientStreaming) { + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -73,7 +73,7 @@ public static BiDiStream of( IntSupplier nextIntTicket, boolean useWebsocket) { if (useWebsocket) { - return websocket(bidirectionalStream.openBiDiStream(headers.get())); + return bidi(bidirectionalStream.openBiDiStream(headers.get())); } else { return new EmulatedBiDiStream<>( openEmulatedStream, @@ -84,7 +84,7 @@ public static BiDiStream of( } } - public static BiDiStream websocket(Object bidirectionalStream) { + public static BiDiStream bidi(Object bidirectionalStream) { return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream)); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java new file mode 100644 index 00000000000..d51bfcb33d3 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransport.java @@ -0,0 +1,74 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * gRPC transport implementation. + * + */ +@JsType(namespace = "dh.grpc") +@TsInterface +public interface GrpcTransport { + /** + * Starts the stream, sending metadata to the server. + * + * @param metadata the headers to send the server when opening the connection + */ + void start(JsPropertyMap metadata); + + /** + * Sends a message to the server. + * + * @param msgBytes bytes to send to the server + */ + void sendMessage(Uint8Array msgBytes); + + /** + * "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still + * open to receiving messages. + */ + void finishSend(); + + /** + * End the stream, both notifying the server that no more messages will be sent nor received, and preventing the + * client from receiving any more events. + */ + void cancel(); + + /** + * Helper to transform ts implementations to our own api. + */ + @JsIgnore + static GrpcTransport from(Transport tsTransport) { + return new GrpcTransport() { + @Override + public void start(JsPropertyMap metadata) { + tsTransport.start(new BrowserHeaders(metadata)); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + tsTransport.sendMessage(msgBytes); + } + + @Override + public void finishSend() { + tsTransport.finishSend(); + } + + @Override + public void cancel() { + tsTransport.cancel(); + } + }; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java new file mode 100644 index 00000000000..978e1a36426 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportFactory.java @@ -0,0 +1,70 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.Uint8Array; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory; +import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsProperty; +import jsinterop.annotations.JsType; +import jsinterop.base.Js; + +/** + * Factory for creating gRPC transports. + */ +@TsInterface +@JsType(namespace = "dh.grpc", isNative = true) +public interface GrpcTransportFactory { + /** + * Create a new transport instance. + * + * @param options options for creating the transport + * @return a transport instance to use for gRPC communication + */ + GrpcTransport create(GrpcTransportOptions options); + + /** + * Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it + * more than once before {@link GrpcTransport#finishSend()} should be called. + * + * @return true to signal that the implementation can stream multiple messages, false otherwise indicating that + * Open/Next gRPC calls should be used + */ + @JsProperty + boolean getSupportsClientStreaming(); + + /** + * Adapt this factory to the transport factory used by the gRPC-web library. + */ + @JsOverlay + default TransportFactory adapt() { + return options -> { + GrpcTransport impl = create(GrpcTransportOptions.from(options)); + return new Transport() { + @Override + public void cancel() { + impl.cancel(); + } + + @Override + public void finishSend() { + impl.finishSend(); + } + + @Override + public void sendMessage(Uint8Array msgBytes) { + impl.sendMessage(msgBytes); + } + + @Override + public void start(BrowserHeaders metadata) { + impl.start(Js.cast(metadata.headersMap)); + } + }; + }; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java new file mode 100644 index 00000000000..8853471a98e --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/GrpcTransportOptions.java @@ -0,0 +1,92 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsInterface; +import elemental2.core.JsError; +import elemental2.core.Uint8Array; +import elemental2.dom.URL; +import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; +import jsinterop.annotations.JsFunction; +import jsinterop.annotations.JsIgnore; +import jsinterop.annotations.JsNullable; +import jsinterop.annotations.JsOptional; +import jsinterop.annotations.JsType; +import jsinterop.base.JsPropertyMap; + +/** + * Options for creating a gRPC stream transport instance. + */ +@TsInterface +@JsType(namespace = "dh.grpc") +public class GrpcTransportOptions { + @JsFunction + @FunctionalInterface + public interface OnHeadersCallback { + void onHeaders(JsPropertyMap headers, int status); + } + + @JsFunction + @FunctionalInterface + public interface OnChunkCallback { + void onChunk(Uint8Array chunk); + } + + @JsFunction + @FunctionalInterface + public interface OnEndCallback { + void onEnd(@JsOptional @JsNullable JsError error); + } + + /** + * The gRPC method URL. + */ + public URL url; + + /** + * True to enable debug logging for this stream. + */ + public boolean debug; + + /** + * Callback for when headers and status are received. The headers are a map of header names to values, and the + * status is the HTTP status code. If the connection could not be made, the status should be 0. + */ + public OnHeadersCallback onHeaders; + + /** + * Callback for when a chunk of data is received. + */ + public OnChunkCallback onChunk; + + /** + * Callback for when the stream ends, with an error instance if it can be provided. Note that the present + * implementation does not consume errors, even if provided. + */ + public OnEndCallback onEnd; + + /** + * Internal copy of options, to be used for fallback. + */ + @JsIgnore + public TransportOptions originalOptions; + + /** + * Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance. + */ + @JsIgnore + public static GrpcTransportOptions from(TransportOptions options) { + GrpcTransportOptions impl = new GrpcTransportOptions(); + impl.url = new URL(options.getUrl()); + impl.debug = options.isDebug(); + impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status); + impl.onChunk = p0 -> { + // "false" because the underlying implementation doesn't rely on this anyway. + options.getOnChunk().onInvoke(p0, false); + }; + impl.onEnd = options.getOnEnd()::onInvoke; + return impl; + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java new file mode 100644 index 00000000000..9d64a7c0402 --- /dev/null +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/HeaderValueUnion.java @@ -0,0 +1,42 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import com.vertispan.tsdefs.annotations.TsUnion; +import com.vertispan.tsdefs.annotations.TsUnionMember; +import elemental2.core.JsArray; +import javaemul.internal.annotations.DoNotAutobox; +import jsinterop.annotations.JsOverlay; +import jsinterop.annotations.JsPackage; +import jsinterop.annotations.JsType; +import jsinterop.base.Js; + +/** + * Union of string and array of string, as node/browser APIs tend to accept either for http headers. + */ +@TsUnion +@JsType(name = "?", namespace = JsPackage.GLOBAL, isNative = true) +public interface HeaderValueUnion { + @JsOverlay + static HeaderValueUnion of(@DoNotAutobox Object value) { + return Js.cast(value); + } + + @JsOverlay + default boolean isArray() { + return JsArray.isArray(this); + } + + @TsUnionMember + @JsOverlay + default String asString() { + return Js.cast(this); + } + + @TsUnionMember + @JsOverlay + default JsArray asArray() { + return Js.cast(this); + } +} diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java index dfb0fd7cea4..a40cf532e1b 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/grpc/MultiplexedWebsocketTransport.java @@ -17,10 +17,10 @@ import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport; -import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.web.client.api.JsLazy; import io.deephaven.web.shared.fu.JsRunnable; import jsinterop.base.Js; +import jsinterop.base.JsPropertyMap; import java.util.ArrayList; import java.util.HashMap; @@ -32,11 +32,23 @@ * equal, this transport should be preferred to the default grpc-websockets transport, and in turn the fetch based * transport is usually superior to this. */ -public class MultiplexedWebsocketTransport implements Transport { +public class MultiplexedWebsocketTransport implements GrpcTransport { public static final String MULTIPLEX_PROTOCOL = "grpc-websockets-multiplex"; public static final String SOCKET_PER_STREAM_PROTOCOL = "grpc-websockets"; + public static class Factory implements GrpcTransportFactory { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return new MultiplexedWebsocketTransport(options); + } + + @Override + public boolean getSupportsClientStreaming() { + return true; + } + } + private static Uint8Array encodeASCII(String str) { Uint8Array encoded = new Uint8Array(str.length()); for (int i = 0; i < str.length(); i++) { @@ -55,9 +67,9 @@ private interface QueuedEntry { public static class HeaderFrame implements QueuedEntry { private final String path; - private final BrowserHeaders metadata; + private final JsPropertyMap metadata; - public HeaderFrame(String path, BrowserHeaders metadata) { + public HeaderFrame(String path, JsPropertyMap metadata) { this.path = path; this.metadata = metadata; } @@ -66,9 +78,14 @@ public HeaderFrame(String path, BrowserHeaders metadata) { public void send(WebSocket webSocket, int streamId) { final Uint8Array headerBytes; final StringBuilder str = new StringBuilder(); - metadata.append("grpc-websockets-path", path); - metadata.forEach((key, value) -> { - str.append(key).append(": ").append(value.join(", ")).append("\r\n"); + metadata.set("grpc-websockets-path", HeaderValueUnion.of(path)); + metadata.forEach((key) -> { + HeaderValueUnion value = metadata.get(key); + if (value.isArray()) { + str.append(key).append(": ").append(value.asArray().join(", ")).append("\r\n"); + } else { + str.append(key).append(": ").append(value.asString()).append("\r\n"); + } }); headerBytes = encodeASCII(str.toString()); Int8Array payload = new Int8Array(headerBytes.byteLength + 4); @@ -79,7 +96,7 @@ public void send(WebSocket webSocket, int streamId) { @Override public void sendFallback(Transport transport) { - transport.start(metadata); + transport.start(new BrowserHeaders(metadata)); } } @@ -201,16 +218,16 @@ private void release() { private ActiveTransport transport; private final int streamId = nextStreamId++; private final List sendQueue = new ArrayList<>(); - private final TransportOptions options; + private final GrpcTransportOptions options; private final String path; private final JsLazy alternativeTransport; private JsRunnable cleanup = JsRunnable.doNothing(); - public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidMultiplexCallback) { + public MultiplexedWebsocketTransport(GrpcTransportOptions options) { this.options = options; - String url = options.getUrl(); + String url = options.url.toString(); URL urlWrapper = new URL(url); // preserve the path to send as metadata, but still talk to the server with that path path = urlWrapper.pathname.substring(1); @@ -220,16 +237,13 @@ public MultiplexedWebsocketTransport(TransportOptions options, JsRunnable avoidM transport = ActiveTransport.get(url); // prepare a fallback - alternativeTransport = new JsLazy<>(() -> { - avoidMultiplexCallback.run(); - return Grpc.WebsocketTransport.onInvoke().onInvoke(options); - }); + alternativeTransport = new JsLazy<>(() -> Grpc.WebsocketTransport.onInvoke().onInvoke(options.originalOptions)); } @Override - public void start(BrowserHeaders metadata) { + public void start(JsPropertyMap metadata) { if (alternativeTransport.isAvailable()) { - alternativeTransport.get().start(metadata); + alternativeTransport.get().start(new BrowserHeaders(metadata)); return; } this.transport.retain(); @@ -325,7 +339,7 @@ private void onClose(Event event) { return; } // each grpc transport will handle this as an error - options.getOnEnd().onInvoke(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); + options.onEnd.onEnd(new JsError("Unexpectedly closed " + Js.uncheckedCast(event).reason)); removeHandlers(); } @@ -345,9 +359,9 @@ private void onMessage(Event event) { closed = false; } if (streamId == this.streamId) { - options.getOnChunk().onInvoke(new Uint8Array(messageEvent.data, 4), false); + options.onChunk.onChunk(new Uint8Array(messageEvent.data, 4)); if (closed) { - options.getOnEnd().onInvoke(null); + options.onEnd.onEnd(null); removeHandlers(); } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java index bea6aacbba5..1ea9412d769 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/ide/IdeConnection.java @@ -7,7 +7,10 @@ import elemental2.core.JsArray; import elemental2.promise.Promise; import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders; +import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc; import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Code; +import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport; +import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.terminationnotificationresponse.StackTrace; import io.deephaven.web.client.api.ConnectOptions; @@ -16,6 +19,10 @@ import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper; import io.deephaven.web.client.api.console.JsVariableChanges; import io.deephaven.web.client.api.console.JsVariableDescriptor; +import io.deephaven.web.client.api.grpc.GrpcTransport; +import io.deephaven.web.client.api.grpc.GrpcTransportFactory; +import io.deephaven.web.client.api.grpc.GrpcTransportOptions; +import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport; import io.deephaven.web.shared.data.ConnectToken; import io.deephaven.web.shared.fu.JsConsumer; import io.deephaven.web.shared.fu.JsRunnable; @@ -57,6 +64,26 @@ public IdeConnection(String serverUrl, Object connectOptions) { } else { options = new ConnectOptions(); } + if (options.transportFactory == null) { + // assign a default transport factory + if (options.useWebsockets == Boolean.TRUE || !serverUrl.startsWith("https:")) { + options.transportFactory = new MultiplexedWebsocketTransport.Factory(); + } else { + options.transportFactory = new GrpcTransportFactory() { + @Override + public GrpcTransport create(GrpcTransportOptions options) { + return GrpcTransport + .from((Transport) Grpc.FetchReadableStreamTransport.onInvoke(new Object()) + .onInvoke((TransportOptions) options)); + } + + @Override + public boolean getSupportsClientStreaming() { + return false; + } + }; + } + } } @Override diff --git a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java index 3c1887e8a5c..3c3aa8791ca 100644 --- a/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java +++ b/web/client-api/src/test/java/io/deephaven/web/ClientIntegrationTestSuite.java @@ -5,6 +5,7 @@ import com.google.gwt.junit.tools.GWTTestSuite; import io.deephaven.web.client.api.*; +import io.deephaven.web.client.api.grpc.GrpcTransportTestGwt; import io.deephaven.web.client.api.storage.JsStorageServiceTestGwt; import io.deephaven.web.client.api.subscription.ConcurrentTableTestGwt; import io.deephaven.web.client.api.subscription.ViewportTestGwt; @@ -30,6 +31,7 @@ public static Test suite() { suite.addTestSuite(JsStorageServiceTestGwt.class); suite.addTestSuite(InputTableTestGwt.class); suite.addTestSuite(ColumnStatisticsTestGwt.class); + suite.addTestSuite(GrpcTransportTestGwt.class); // This should be a unit test, but it requires a browser environment to run on GWT 2.9 // GWT 2.9 doesn't have proper bindings for Promises in HtmlUnit, so we need to use the IntegrationTest suite diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java new file mode 100644 index 00000000000..020c65b771a --- /dev/null +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/grpc/GrpcTransportTestGwt.java @@ -0,0 +1,143 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.web.client.api.grpc; + +import elemental2.promise.Promise; +import io.deephaven.web.client.api.AbstractAsyncGwtTestCase; +import io.deephaven.web.client.api.ConnectOptions; +import io.deephaven.web.client.api.CoreClient; +import jsinterop.base.JsPropertyMap; + +/** + * Simple test to verify we can produce custom transports in JS. Only works with https, which means it can only be run + * manually at this time, or it will trivially succeed. + */ +public class GrpcTransportTestGwt extends AbstractAsyncGwtTestCase { + @Override + public String getModuleName() { + return "io.deephaven.web.DeephavenIntegrationTest"; + } + + /** + * Simple fetch impl, with no cancelation handling. + */ + public native GrpcTransportFactory makeFetchTransportFactory() /*-{ + return { + create: function(options) { + function pump(reader, res) { + reader.read().then(function(result) { + if (result.done) { + options.onEnd(); + } else { + options.onChunk(result.value); + pump(reader, res); + } + })['catch'](function(e) { + options.onEnd(e); + }); + } + return { + start: function(metadata) { + this.metadata = metadata; + }, + sendMessage: function(msgBytes) { + var fetchInit = { + headers: new Headers(this.metadata), + method: "POST", + body: msgBytes, + }; + $wnd.fetch(options.url.href, fetchInit).then(function(response) { + var m = {}; + response.headers.forEach(function(value, key) { + m[key] = value; + }); + options.onHeaders(m, response.status); + if (response.body) { + pump(response.body.getReader(), response); + } + return response; + })['catch'](function(e) { + options.onEnd(e); + }); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: false + }; + }-*/; + + public void testFetchGrpcTransport() { + if (!localServer.startsWith("https:")) { + // We're using h2, so we need to be on https for our current implementation + return; + } + setupDhInternal().then(ignore -> { + delayTestFinish(7101); + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeFetchTransportFactory(); + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } + + /** + * Dummy transport that just sends a single message and receives a single message. Doesn't actually talk to the + * server, headers are empty, and the message is always 5 byte proto payload "no data", followed by trailers + * signifying success. + */ + private native GrpcTransportFactory makeDummyTransportFactory() /*-{ + return { + create: function(options) { + return { + start: function(metadata) { + // empty headers + $wnd.setTimeout(function() {options.onHeaders({}, 200);}, 0); + }, + sendMessage: function(msgBytes) { + // empty payload + var empty = new $wnd.Uint8Array(5); + // successful trailer payload + var trailersString = 'grpc-status:0'; + var successTrailers = new $wnd.Uint8Array(5 + trailersString.length); + successTrailers[0] = 128; + successTrailers[4] = trailersString.length; + new $wnd.TextEncoder('utf-8').encodeInto(trailersString, successTrailers.subarray(5)); + $wnd.setTimeout(function() { + // delay a bit, then send the empty messages and end the stream + options.onChunk(empty); + options.onChunk(successTrailers); + options.onEnd(); + }, 0); + }, + finishSend: function() { + // no-op + }, + cancel: function() { + // no-op + } + }; + }, + supportsClientStreaming: true + }; + }-*/; + + public void testDummyGrpcTransport() { + setupDhInternal().then(ignore -> { + delayTestFinish(7102); + ConnectOptions connectOptions = new ConnectOptions(); + connectOptions.transportFactory = makeDummyTransportFactory(); + connectOptions.debug = true; + CoreClient coreClient = new CoreClient(localServer, connectOptions); + return coreClient.login(JsPropertyMap.of("type", CoreClient.LOGIN_TYPE_ANONYMOUS)) + .then(ignore2 -> Promise.resolve(coreClient)); + }).then(this::finish).catch_(this::report); + } +} diff --git a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java index 7bb39eda843..d9162713094 100644 --- a/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java +++ b/web/client-backplane/src/main/java/io/deephaven/javascript/proto/dhinternal/grpcweb/transports/transport/TransportOptions.java @@ -33,7 +33,7 @@ public interface OnEndFn { @JsFunction public interface OnHeadersFn { - void onInvoke(BrowserHeaders p0, double p1); + void onInvoke(BrowserHeaders p0, int p1); } @JsOverlay