Skip to content

Commit

Permalink
fix: Better chunk cleanup on StreamToBlinkTableAdapter.destroy, and a…
Browse files Browse the repository at this point in the history
… few listener.destroy fixes (#6406)
  • Loading branch information
rcaudy authored Nov 22, 2024
1 parent ecdc8e7 commit 52af771
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
leftStampKeys.close();
leftStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
leftStampKeys.close();
leftStampValues.close();
});
}
});

Expand Down Expand Up @@ -1522,8 +1524,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
compactedRightStampKeys.close();
compactedRightStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
compactedRightStampKeys.close();
compactedRightStampValues.close();
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ private RowSet indexFromBuilder(int slotIndex) {
@Override
protected void destroy() {
super.destroy();
leftSsaFactory.close();
rightSsaFactory.close();
getUpdateGraph().runWhenIdle(() -> {
leftSsaFactory.close();
rightSsaFactory.close();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.SwitchColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.MultiException;
Expand All @@ -40,11 +40,13 @@
import java.lang.ref.WeakReference;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -271,6 +273,15 @@ private static void maybeClearChunkColumnSource(ColumnSource<?> cs) {
}
}

private synchronized void clearChunkColumnSources() {
SafeCloseable.closeAll(
Stream.of(bufferChunkSources, currentChunkSources, prevChunkSources)
.filter(Objects::nonNull)
.flatMap(Arrays::stream)
.map(ccs -> ccs::clear));
bufferChunkSources = currentChunkSources = prevChunkSources = null;
}

/**
* Return the {@link Table#BLINK_TABLE_ATTRIBUTE blink} {@link Table table} that this adapter is producing, and
* ensure that this StreamToBlinkTableAdapter no longer enforces strong reachability of the result. May return
Expand Down Expand Up @@ -306,6 +317,7 @@ public void close() {
.endl();
updateSourceRegistrar.removeSource(this);
streamPublisher.shutdown();
getUpdateGraph().runWhenIdle(this::clearChunkColumnSources);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package io.deephaven.stream;

import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.Table;
Expand All @@ -18,6 +22,7 @@
import io.deephaven.engine.table.impl.SimpleListener;
import io.deephaven.chunk.*;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.type.ArrayTypeUtils;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -446,6 +451,34 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
TestCase.assertTrue(listenerFailed.booleanValue());
}

@Test
public void testCleanup() {
final TableDefinition tableDefinition = TableDefinition.from(
List.of("O", "B", "S", "I", "L", "F", "D", "C"),
List.of(String.class, byte.class, short.class, int.class, long.class, float.class, double.class,
char.class));
final Table tableToAdd = emptyTable(ChunkPoolConstants.SMALLEST_POOLED_CHUNK_CAPACITY).updateView(
"O=Long.toString(ii)", "B=(byte)ii", "S=(short)ii", "I=(int)ii", "L=ii", "F=(float)ii",
"D=(double)ii", "C=(char)ii");
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) {
final TablePublisher tablePublisher = TablePublisher.of("Test", tableDefinition, null, null);
// Add buffered chunks
tablePublisher.add(tableToAdd);
// Move buffered chunks to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add more buffered chunks
tablePublisher.add(tableToAdd);
// Move current to previous, buffered to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add even more buffered chunks
tablePublisher.add(tableToAdd);
}
ChunkPoolReleaseTracking.check();
}

private static class DummyStreamPublisher implements StreamPublisher {

private boolean fail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.function.ThrowingSupplier;
import io.deephaven.util.locks.AwareFunctionalLock;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -244,4 +245,24 @@ public LogOutput append(LogOutput output) {
}

// endregion refresh control

/**
* Run {@code task} immediately if this UpdateGraph is currently idle, else schedule {@code task} to run at a later
* time when it has become idle.
*
* @param task The task to run when idle
*/
@FinalDefault
default void runWhenIdle(@NotNull final Runnable task) {
if (clock().currentState() == LogicalClock.State.Idle) {
task.run();
} else {
addNotification(new TerminalNotification() {
@Override
public void run() {
task.run();
}
});
}
}
}

0 comments on commit 52af771

Please sign in to comment.