diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index 01d180c4d6d..d7559c626d6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -297,9 +297,7 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS CURRENT - // THREAD's EXEC CONTEXT, - // UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -321,6 +319,12 @@ public PartitionedTable transform( return resultPartitionedTable; } + /** + * Ensures that the returned executionContext will have an OperationInitializer compatible with being called by work + * already running on an initialization thread - it must either already return false for + * {@link OperationInitializer#canParallelize()}, or must be a different instance than the current context's + * OperationInitializer. + */ private static ExecutionContext maybeReplaceExecContext(ExecutionContext provided) { if (provided == null) { return null; @@ -370,9 +374,7 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - maybeReplaceExecContext(executionContext), // THIS ONE MUST NOT HAVE THE SAME OT AS THIS - // CURRENT THREAD's EXEC - // CONTEXT, UNLESS NON_PARALLELIZABLE + maybeReplaceExecContext(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 5efe37a20b3..acc4ea026ee 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -1212,7 +1212,7 @@ public Builder numUpdateThreads(int numUpdateThreads) { /** * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. - * + * * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads * @return this builder */ diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java index 84317389a9f..d5d4337e292 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -4,7 +4,7 @@ import java.util.concurrent.Future; /** - * alt naming: OperationParallelismControl? + * Provides guidance for initialization operations on how they can parallelize. */ public interface OperationInitializer { OperationInitializer NON_PARALLELIZABLE = new OperationInitializer() { @@ -26,21 +26,17 @@ public int parallelismFactor() { }; /** - * @return Whether the current thread can parallelize operations using this OperationInitialization. + * Whether the current thread can parallelize operations using this OperationInitialization. */ boolean canParallelize(); /** * Submits a task to run in this thread pool. - * - * @param runnable - * @return */ Future submit(Runnable runnable); /** - * - * @return + * Number of threads that are potentially available. */ int parallelismFactor(); } diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index d8276ae33b9..be2dcba5eb9 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -60,7 +60,3 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper - - -# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization= diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index 64dfddae10a..f7d2503aa35 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -102,4 +102,3 @@ client.version.list= authentication.anonymous.warn=false deephaven.console.type=none -thread.initialization=