From 52af7713627d5fe08a41626ed5f2fb364eb500af Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Fri, 22 Nov 2024 17:35:55 -0500 Subject: [PATCH] fix: Better chunk cleanup on StreamToBlinkTableAdapter.destroy, and a few listener.destroy fixes (#6406) --- .../engine/table/impl/AsOfJoinHelper.java | 12 ++++--- .../BucketedChunkedAjMergedListener.java | 6 ++-- .../ByteChunkColumnSource.java | 2 +- .../CharChunkColumnSource.java | 2 +- .../DoubleChunkColumnSource.java | 2 +- .../FloatChunkColumnSource.java | 2 +- .../IntChunkColumnSource.java | 2 +- .../LongChunkColumnSource.java | 2 +- .../ObjectChunkColumnSource.java | 2 +- .../ShortChunkColumnSource.java | 2 +- .../stream/StreamToBlinkTableAdapter.java | 18 ++++++++-- .../stream/TestStreamToBlinkTableAdapter.java | 33 +++++++++++++++++++ .../engine/updategraph/UpdateGraph.java | 21 ++++++++++++ 13 files changed, 89 insertions(+), 17 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java index 92ee2074d11..47df5e98c26 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java @@ -1340,8 +1340,10 @@ public void onUpdate(TableUpdate upstream) { @Override protected void destroy() { super.destroy(); - leftStampKeys.close(); - leftStampValues.close(); + getUpdateGraph().runWhenIdle(() -> { + leftStampKeys.close(); + leftStampValues.close(); + }); } }); @@ -1522,8 +1524,10 @@ public void onUpdate(TableUpdate upstream) { @Override protected void destroy() { super.destroy(); - compactedRightStampKeys.close(); - compactedRightStampValues.close(); + getUpdateGraph().runWhenIdle(() -> { + compactedRightStampKeys.close(); + compactedRightStampValues.close(); + }); } }); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java index 8496a578178..ebc51824774 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java @@ -791,7 +791,9 @@ private RowSet indexFromBuilder(int slotIndex) { @Override protected void destroy() { super.destroy(); - leftSsaFactory.close(); - rightSsaFactory.close(); + getUpdateGraph().runWhenIdle(() -> { + leftSsaFactory.close(); + rightSsaFactory.close(); + }); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ByteChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ByteChunkColumnSource.java index b52645f878b..efb9942419a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ByteChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ByteChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/CharChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/CharChunkColumnSource.java index 57aeba11fbe..9cf074473dd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/CharChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/CharChunkColumnSource.java @@ -185,7 +185,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/DoubleChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/DoubleChunkColumnSource.java index d620235038c..6348a6857ce 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/DoubleChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/DoubleChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/FloatChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/FloatChunkColumnSource.java index 908bcc06fb4..eaf6fa88c5d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/FloatChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/FloatChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/IntChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/IntChunkColumnSource.java index 417c568f5fa..e84825a1264 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/IntChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/IntChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/LongChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/LongChunkColumnSource.java index 9db4a1ec35d..2a703e0bd16 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/LongChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/LongChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ObjectChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ObjectChunkColumnSource.java index 5e6bae02f21..69dbcdb3409 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ObjectChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ObjectChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ShortChunkColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ShortChunkColumnSource.java index c89e46d4452..d74ee9050fe 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ShortChunkColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/chunkcolumnsource/ShortChunkColumnSource.java @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk chunk) { } @Override - public void clear() { + public synchronized void clear() { totalSize = 0; data.forEach(SafeCloseable::close); data.clear(); diff --git a/engine/table/src/main/java/io/deephaven/stream/StreamToBlinkTableAdapter.java b/engine/table/src/main/java/io/deephaven/stream/StreamToBlinkTableAdapter.java index 0543b62b738..08aba9b7f63 100644 --- a/engine/table/src/main/java/io/deephaven/stream/StreamToBlinkTableAdapter.java +++ b/engine/table/src/main/java/io/deephaven/stream/StreamToBlinkTableAdapter.java @@ -19,9 +19,6 @@ import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource; import io.deephaven.engine.table.impl.sources.NullValueColumnSource; import io.deephaven.engine.table.impl.sources.SwitchColumnSource; -import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.engine.updategraph.UpdateGraph; -import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.WritableChunk; @@ -29,6 +26,9 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.rowset.TrackingWritableRowSet; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.updategraph.UpdateSourceRegistrar; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.MultiException; @@ -40,11 +40,13 @@ import java.lang.ref.WeakReference; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -271,6 +273,15 @@ private static void maybeClearChunkColumnSource(ColumnSource cs) { } } + private synchronized void clearChunkColumnSources() { + SafeCloseable.closeAll( + Stream.of(bufferChunkSources, currentChunkSources, prevChunkSources) + .filter(Objects::nonNull) + .flatMap(Arrays::stream) + .map(ccs -> ccs::clear)); + bufferChunkSources = currentChunkSources = prevChunkSources = null; + } + /** * Return the {@link Table#BLINK_TABLE_ATTRIBUTE blink} {@link Table table} that this adapter is producing, and * ensure that this StreamToBlinkTableAdapter no longer enforces strong reachability of the result. May return @@ -306,6 +317,7 @@ public void close() { .endl(); updateSourceRegistrar.removeSource(this); streamPublisher.shutdown(); + getUpdateGraph().runWhenIdle(this::clearChunkColumnSources); } } diff --git a/engine/table/src/test/java/io/deephaven/stream/TestStreamToBlinkTableAdapter.java b/engine/table/src/test/java/io/deephaven/stream/TestStreamToBlinkTableAdapter.java index 5dee5f43d28..26e4928b0a4 100644 --- a/engine/table/src/test/java/io/deephaven/stream/TestStreamToBlinkTableAdapter.java +++ b/engine/table/src/test/java/io/deephaven/stream/TestStreamToBlinkTableAdapter.java @@ -4,7 +4,11 @@ package io.deephaven.stream; import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.pools.ChunkPoolConstants; +import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.liveness.LivenessScope; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.table.Table; @@ -18,6 +22,7 @@ import io.deephaven.engine.table.impl.SimpleListener; import io.deephaven.chunk.*; import io.deephaven.util.BooleanUtils; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.type.ArrayTypeUtils; import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableBoolean; @@ -446,6 +451,34 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) { TestCase.assertTrue(listenerFailed.booleanValue()); } + @Test + public void testCleanup() { + final TableDefinition tableDefinition = TableDefinition.from( + List.of("O", "B", "S", "I", "L", "F", "D", "C"), + List.of(String.class, byte.class, short.class, int.class, long.class, float.class, double.class, + char.class)); + final Table tableToAdd = emptyTable(ChunkPoolConstants.SMALLEST_POOLED_CHUNK_CAPACITY).updateView( + "O=Long.toString(ii)", "B=(byte)ii", "S=(short)ii", "I=(int)ii", "L=ii", "F=(float)ii", + "D=(double)ii", "C=(char)ii"); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) { + final TablePublisher tablePublisher = TablePublisher.of("Test", tableDefinition, null, null); + // Add buffered chunks + tablePublisher.add(tableToAdd); + // Move buffered chunks to current + updateGraph.runWithinUnitTestCycle(() -> { + }); + // Add more buffered chunks + tablePublisher.add(tableToAdd); + // Move current to previous, buffered to current + updateGraph.runWithinUnitTestCycle(() -> { + }); + // Add even more buffered chunks + tablePublisher.add(tableToAdd); + } + ChunkPoolReleaseTracking.check(); + } + private static class DummyStreamPublisher implements StreamPublisher { private boolean fail; diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java index c29efed8408..b546758e5e1 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateGraph.java @@ -6,6 +6,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; import io.deephaven.io.log.LogEntry; +import io.deephaven.util.annotations.FinalDefault; import io.deephaven.util.function.ThrowingSupplier; import io.deephaven.util.locks.AwareFunctionalLock; import org.jetbrains.annotations.NotNull; @@ -244,4 +245,24 @@ public LogOutput append(LogOutput output) { } // endregion refresh control + + /** + * Run {@code task} immediately if this UpdateGraph is currently idle, else schedule {@code task} to run at a later + * time when it has become idle. + * + * @param task The task to run when idle + */ + @FinalDefault + default void runWhenIdle(@NotNull final Runnable task) { + if (clock().currentState() == LogicalClock.State.Idle) { + task.run(); + } else { + addNotification(new TerminalNotification() { + @Override + public void run() { + task.run(); + } + }); + } + } }