From d10d596683d38369f0589358462e40a83fbc50c5 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 10 Dec 2024 15:45:15 -0600 Subject: [PATCH] fix: JS subscriptions should return correct size while updating (#6463) (#6464) Fixes #6423 Fixes DH-18128 Backport #6463 Backport #6473 --- .../io/deephaven/web/client/api/JsTable.java | 9 ++-- .../AbstractTableSubscription.java | 53 ++++++++++++++----- .../TableViewportSubscription.java | 7 ++- .../api/subscription/ViewportTestGwt.java | 29 ++++++++++ 4 files changed, 77 insertions(+), 21 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java index 04127edec70..9fb9e1c6c30 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java @@ -452,7 +452,7 @@ public JsLayoutHints getLayoutHints() { @JsProperty public double getSize() { TableViewportSubscription subscription = subscriptions.get(getHandle()); - if (subscription != null && subscription.getStatus() == TableViewportSubscription.Status.ACTIVE) { + if (subscription != null && subscription.hasValidSize()) { // only ask the viewport for the size if it is alive and ticking return subscription.size(); } @@ -705,7 +705,7 @@ public TableViewportSubscription setViewport(double firstRow, double lastRow, Column[] columnsCopy = columns != null ? Js.uncheckedCast(columns.slice()) : state().getColumns(); ClientTableState currentState = state(); TableViewportSubscription activeSubscription = subscriptions.get(getHandle()); - if (activeSubscription != null && activeSubscription.getStatus() != TableViewportSubscription.Status.DONE) { + if (activeSubscription != null && !activeSubscription.isClosed()) { // hasn't finished, lets reuse it activeSubscription.setInternalViewport(firstRow, lastRow, columnsCopy, updateIntervalMs, isReverseViewport); return activeSubscription; @@ -1583,8 +1583,7 @@ public void setState(final ClientTableState state) { if (!isClosed() && was != null && was != state()) { // if we held a subscription TableViewportSubscription existingSubscription = subscriptions.remove(was.getHandle()); - if (existingSubscription != null - && existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) { + if (existingSubscription != null && !existingSubscription.isClosed()) { JsLog.debug("closing old viewport", state(), existingSubscription.state()); // with the replacement state successfully running, we can shut down the old viewport (unless // something external retained it) @@ -1715,7 +1714,7 @@ public void setSize(double s) { this.size = s; TableViewportSubscription subscription = subscriptions.get(getHandle()); - if (changed && (subscription == null || subscription.getStatus() == TableViewportSubscription.Status.DONE)) { + if (changed && (subscription == null || !subscription.hasValidSize())) { // If the size changed, and we have no subscription active, fire. Otherwise, we want to let the // subscription itself manage this, so that the size changes are synchronized with data changes, // and consumers won't be confused by the table size not matching data. diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java index f231a93a071..7b27ad75b0f 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/AbstractTableSubscription.java @@ -61,9 +61,11 @@ public abstract class AbstractTableSubscription extends HasEventHandling { */ public static final String EVENT_UPDATED = "updated"; - public enum Status { + protected enum Status { /** Waiting for some prerequisite before we can use it for the first time. */ STARTING, + /** All prerequisites are met, waiting for the first snapshot to be returned. */ + SUBSCRIPTION_REQUESTED, /** Successfully created, not waiting for any messages to be accurate. */ ACTIVE, /** Waiting for an update to return from being active to being active again. */ @@ -117,7 +119,11 @@ protected void revive() { WebBarrageSubscription.ViewportChangedHandler viewportChangedHandler = this::onViewportChange; WebBarrageSubscription.DataChangedHandler dataChangedHandler = this::onDataChanged; - status = Status.ACTIVE; + status = Status.SUBSCRIPTION_REQUESTED; + + // In order to create the subscription, we need to already have the table resolved, so we know if it + // is a blink table or not. In turn, we can't be prepared to handle any messages from the server until + // we know this, so we can't race messages with this design. this.barrageSubscription = WebBarrageSubscription.subscribe( subscriptionType, state, viewportChangedHandler, dataChangedHandler); @@ -139,10 +145,6 @@ protected void revive() { JsRunnable.doNothing()); } - public Status getStatus() { - return status; - } - protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSet columns, @Nullable RangeSet viewport, BarrageSubscriptionOptions options, boolean isReverseViewport) { @@ -168,16 +170,19 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe protected abstract void sendFirstSubscriptionRequest(); - protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray columns, Double updateIntervalMs, + protected void sendBarrageSubscriptionRequest(@Nullable RangeSet viewport, JsArray columns, + Double updateIntervalMs, boolean isReverseViewport) { - if (status == Status.DONE) { + if (isClosed()) { if (failMsg == null) { throw new IllegalStateException("Can't change subscription, already closed"); } else { throw new IllegalStateException("Can't change subscription, already failed: " + failMsg); } } - status = Status.PENDING_UPDATE; + if (status == Status.ACTIVE) { + status = Status.PENDING_UPDATE; + } this.columns = columns; this.viewportRowSet = viewport; this.columnBitSet = makeColumnBitset(columns); @@ -214,15 +219,39 @@ protected WorkerConnection connection() { return connection; } + /** + * True if the subscription is in the ACTIVE state, meaning that the server and client are in sync with the state of + * the subscription. + */ protected boolean isSubscriptionReady() { return status == Status.ACTIVE; } + /** + * Returns true if the subscription is closed and cannot be used again, false if it is actively listening for more + * data. + */ + public boolean isClosed() { + return status == Status.DONE; + } + + /** + * Returns true if the subscription is in a state where it can be used to read data, false if still waiting for the + * server to send the first snapshot or if the subscription has been closed. + * + * @return true if the {@link #size()} method will return data based on the subscription, false if some other source + * of the table's size will be used. + */ + public boolean hasValidSize() { + return status == Status.ACTIVE || status == Status.PENDING_UPDATE; + } + + public double size() { - if (status == Status.ACTIVE) { + if (hasValidSize()) { return barrageSubscription.getCurrentSize(); } - if (status == Status.DONE) { + if (isClosed()) { throw new IllegalStateException("Can't read size when already closed"); } return state.getSize(); @@ -505,7 +534,7 @@ private void onFlightData(FlightData data) { } protected void onStreamEnd(ResponseStreamWrapper.Status status) { - if (this.status == Status.DONE) { + if (isClosed()) { return; } if (status.isTransportError()) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java index c199be4d497..e9f1bd53233 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java @@ -248,6 +248,7 @@ public void setViewport(double firstRow, double lastRow, @JsOptional @JsNullable public void setInternalViewport(double firstRow, double lastRow, Column[] columns, Double updateIntervalMs, Boolean isReverseViewport) { + // Until we've created the stream, we just cache the requested viewport if (status == Status.STARTING) { this.firstRow = firstRow; this.lastRow = lastRow; @@ -281,7 +282,7 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column */ @JsMethod public void close() { - if (status == Status.DONE) { + if (isClosed()) { JsLog.warn("TableViewportSubscription.close called on subscription that's already done."); } retained = false; @@ -300,14 +301,12 @@ public void internalClose() { reconnectSubscription.remove(); - if (retained || status == Status.DONE) { + if (retained || isClosed()) { // the JsTable has indicated it is no longer interested in this viewport, but other calling // code has retained it, keep it open for now. return; } - status = Status.DONE; - super.close(); } diff --git a/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java b/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java index 0d89e2c7917..1b13e3e5052 100644 --- a/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java +++ b/web/client-api/src/test/java/io/deephaven/web/client/api/subscription/ViewportTestGwt.java @@ -93,6 +93,35 @@ public void testViewportOnStaticTable() { .then(this::finish).catch_(this::report); } + public void testChangePendingViewport() { + connect(tables) + .then(table("staticTable")) + .then(table -> { + delayTestFinish(5001); + + table.setViewport(0, 25, null); + assertEquals(100.0, table.getSize()); + return Promise.resolve(table); + }) + .then(table -> { + assertEquals(100.0, table.getSize()); + table.setViewport(5, 30, null); + assertEquals(100.0, table.getSize()); + return assertUpdateReceived(table, viewport -> { + assertEquals(100.0, table.getSize()); + + assertEquals(5d, viewport.getOffset()); + assertEquals(26, viewport.getRows().length); + }, 2500); + }) + .then(table -> { + assertEquals(100.0, table.getSize()); + table.close(); + return null; + }) + .then(this::finish).catch_(this::report); + } + // TODO: https://deephaven.atlassian.net/browse/DH-11196 public void ignore_testViewportOnGrowingTable() { connect(tables)