Skip to content

Commit

Permalink
fix: JS subscriptions should return correct size while updating (#6463)…
Browse files Browse the repository at this point in the history
… (#6464)

Fixes #6423
Fixes DH-18128
Backport #6463
Backport #6473
  • Loading branch information
niloc132 authored Dec 10, 2024
1 parent 21d9aee commit d10d596
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public JsLayoutHints getLayoutHints() {
@JsProperty
public double getSize() {
TableViewportSubscription subscription = subscriptions.get(getHandle());
if (subscription != null && subscription.getStatus() == TableViewportSubscription.Status.ACTIVE) {
if (subscription != null && subscription.hasValidSize()) {
// only ask the viewport for the size if it is alive and ticking
return subscription.size();
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public TableViewportSubscription setViewport(double firstRow, double lastRow,
Column[] columnsCopy = columns != null ? Js.uncheckedCast(columns.slice()) : state().getColumns();
ClientTableState currentState = state();
TableViewportSubscription activeSubscription = subscriptions.get(getHandle());
if (activeSubscription != null && activeSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (activeSubscription != null && !activeSubscription.isClosed()) {
// hasn't finished, lets reuse it
activeSubscription.setInternalViewport(firstRow, lastRow, columnsCopy, updateIntervalMs, isReverseViewport);
return activeSubscription;
Expand Down Expand Up @@ -1583,8 +1583,7 @@ public void setState(final ClientTableState state) {
if (!isClosed() && was != null && was != state()) {
// if we held a subscription
TableViewportSubscription existingSubscription = subscriptions.remove(was.getHandle());
if (existingSubscription != null
&& existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) {
if (existingSubscription != null && !existingSubscription.isClosed()) {
JsLog.debug("closing old viewport", state(), existingSubscription.state());
// with the replacement state successfully running, we can shut down the old viewport (unless
// something external retained it)
Expand Down Expand Up @@ -1715,7 +1714,7 @@ public void setSize(double s) {
this.size = s;

TableViewportSubscription subscription = subscriptions.get(getHandle());
if (changed && (subscription == null || subscription.getStatus() == TableViewportSubscription.Status.DONE)) {
if (changed && (subscription == null || !subscription.hasValidSize())) {
// If the size changed, and we have no subscription active, fire. Otherwise, we want to let the
// subscription itself manage this, so that the size changes are synchronized with data changes,
// and consumers won't be confused by the table size not matching data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ public abstract class AbstractTableSubscription extends HasEventHandling {
*/
public static final String EVENT_UPDATED = "updated";

public enum Status {
protected enum Status {
/** Waiting for some prerequisite before we can use it for the first time. */
STARTING,
/** All prerequisites are met, waiting for the first snapshot to be returned. */
SUBSCRIPTION_REQUESTED,
/** Successfully created, not waiting for any messages to be accurate. */
ACTIVE,
/** Waiting for an update to return from being active to being active again. */
Expand Down Expand Up @@ -117,7 +119,11 @@ protected void revive() {
WebBarrageSubscription.ViewportChangedHandler viewportChangedHandler = this::onViewportChange;
WebBarrageSubscription.DataChangedHandler dataChangedHandler = this::onDataChanged;

status = Status.ACTIVE;
status = Status.SUBSCRIPTION_REQUESTED;

// In order to create the subscription, we need to already have the table resolved, so we know if it
// is a blink table or not. In turn, we can't be prepared to handle any messages from the server until
// we know this, so we can't race messages with this design.
this.barrageSubscription = WebBarrageSubscription.subscribe(
subscriptionType, state, viewportChangedHandler, dataChangedHandler);

Expand All @@ -139,10 +145,6 @@ protected void revive() {
JsRunnable.doNothing());
}

public Status getStatus() {
return status;
}

protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSet columns,
@Nullable RangeSet viewport,
BarrageSubscriptionOptions options, boolean isReverseViewport) {
Expand All @@ -168,16 +170,19 @@ protected static FlatBufferBuilder subscriptionRequest(byte[] tableTicket, BitSe

protected abstract void sendFirstSubscriptionRequest();

protected void sendBarrageSubscriptionRequest(RangeSet viewport, JsArray<Column> columns, Double updateIntervalMs,
protected void sendBarrageSubscriptionRequest(@Nullable RangeSet viewport, JsArray<Column> columns,
Double updateIntervalMs,
boolean isReverseViewport) {
if (status == Status.DONE) {
if (isClosed()) {
if (failMsg == null) {
throw new IllegalStateException("Can't change subscription, already closed");
} else {
throw new IllegalStateException("Can't change subscription, already failed: " + failMsg);
}
}
status = Status.PENDING_UPDATE;
if (status == Status.ACTIVE) {
status = Status.PENDING_UPDATE;
}
this.columns = columns;
this.viewportRowSet = viewport;
this.columnBitSet = makeColumnBitset(columns);
Expand Down Expand Up @@ -214,15 +219,39 @@ protected WorkerConnection connection() {
return connection;
}

/**
* True if the subscription is in the ACTIVE state, meaning that the server and client are in sync with the state of
* the subscription.
*/
protected boolean isSubscriptionReady() {
return status == Status.ACTIVE;
}

/**
* Returns true if the subscription is closed and cannot be used again, false if it is actively listening for more
* data.
*/
public boolean isClosed() {
return status == Status.DONE;
}

/**
* Returns true if the subscription is in a state where it can be used to read data, false if still waiting for the
* server to send the first snapshot or if the subscription has been closed.
*
* @return true if the {@link #size()} method will return data based on the subscription, false if some other source
* of the table's size will be used.
*/
public boolean hasValidSize() {
return status == Status.ACTIVE || status == Status.PENDING_UPDATE;
}


public double size() {
if (status == Status.ACTIVE) {
if (hasValidSize()) {
return barrageSubscription.getCurrentSize();
}
if (status == Status.DONE) {
if (isClosed()) {
throw new IllegalStateException("Can't read size when already closed");
}
return state.getSize();
Expand Down Expand Up @@ -505,7 +534,7 @@ private void onFlightData(FlightData data) {
}

protected void onStreamEnd(ResponseStreamWrapper.Status status) {
if (this.status == Status.DONE) {
if (isClosed()) {
return;
}
if (status.isTransportError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ public void setViewport(double firstRow, double lastRow, @JsOptional @JsNullable

public void setInternalViewport(double firstRow, double lastRow, Column[] columns, Double updateIntervalMs,
Boolean isReverseViewport) {
// Until we've created the stream, we just cache the requested viewport
if (status == Status.STARTING) {
this.firstRow = firstRow;
this.lastRow = lastRow;
Expand Down Expand Up @@ -281,7 +282,7 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column
*/
@JsMethod
public void close() {
if (status == Status.DONE) {
if (isClosed()) {
JsLog.warn("TableViewportSubscription.close called on subscription that's already done.");
}
retained = false;
Expand All @@ -300,14 +301,12 @@ public void internalClose() {

reconnectSubscription.remove();

if (retained || status == Status.DONE) {
if (retained || isClosed()) {
// the JsTable has indicated it is no longer interested in this viewport, but other calling
// code has retained it, keep it open for now.
return;
}

status = Status.DONE;

super.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,35 @@ public void testViewportOnStaticTable() {
.then(this::finish).catch_(this::report);
}

public void testChangePendingViewport() {
connect(tables)
.then(table("staticTable"))
.then(table -> {
delayTestFinish(5001);

table.setViewport(0, 25, null);
assertEquals(100.0, table.getSize());
return Promise.resolve(table);
})
.then(table -> {
assertEquals(100.0, table.getSize());
table.setViewport(5, 30, null);
assertEquals(100.0, table.getSize());
return assertUpdateReceived(table, viewport -> {
assertEquals(100.0, table.getSize());

assertEquals(5d, viewport.getOffset());
assertEquals(26, viewport.getRows().length);
}, 2500);
})
.then(table -> {
assertEquals(100.0, table.getSize());
table.close();
return null;
})
.then(this::finish).catch_(this::report);
}

// TODO: https://deephaven.atlassian.net/browse/DH-11196
public void ignore_testViewportOnGrowingTable() {
connect(tables)
Expand Down

0 comments on commit d10d596

Please sign in to comment.