diff --git a/core/common/saker/build/task/TaskExecutionManager.java b/core/common/saker/build/task/TaskExecutionManager.java index 4f2b02b1..bf8bd899 100644 --- a/core/common/saker/build/task/TaskExecutionManager.java +++ b/core/common/saker/build/task/TaskExecutionManager.java @@ -2456,9 +2456,22 @@ public TaskDependencies getDependencies() { } private static class WaiterThreadHandle extends WeakReference { + /** + * Initial state, the handle hasn't been used yet anywhere. It hasn't been added to futures for waiting. + */ static final int STATE_INITIAL = 0; + /** + * The handle is waiting for notification, and has been added to the waiting thread collection of the relevant + * futures. + */ static final int STATE_WAITING = 1; + /** + * The handle has been notified and should re-check the condition. + */ static final int STATE_NOTIFIED = 2; + /** + * The handle is finished, no longer needs notification. + */ static final int STATE_FINISHED = 3; static final AtomicIntegerFieldUpdater AIFU_state = AtomicIntegerFieldUpdater @@ -2472,6 +2485,17 @@ public WaiterThreadHandle(int triggerEvents) { this.triggerEvents = triggerEvents; } + /** + * Notifies and unparks the thread handle. + *

+ * The method always unparks the associated thread, unless the state is {@link #STATE_FINISHED}. + *

+ * The method sets the state to {@link #STATE_NOTIFIED}, and handles the waiting thread count adjustments. + * + * @param execmanager + * The execution manager. + * @return false if the thread handle is finished, and can be released. + */ public boolean unparkNotify(TaskExecutionManager execmanager) { while (true) { int s = this.state; @@ -3119,7 +3143,7 @@ private void setResultState(TaskExecutionManager execmanager, FutureState s, Fut } throw new AssertionError("Failed to set state for " + taskId + " (" + this.futureState + ") " + nstate); } - unparkAllWaitingThreads(execmanager); + unparkWaitingThreadsForResult(execmanager); } protected TaskResultHolder getWaitWithoutOutputChangeDetector(TaskExecutorContext realcontext) @@ -3937,7 +3961,8 @@ protected void deadlocked() { || s.state == STATE_INITIALIZING; s = this.futureState) { if (ARFU_futureState.compareAndSet(this, s, new DeadlockedFutureState<>(s.getFactory(), s.getInvocationConfiguration(), this.taskId))) { - for (WaiterThreadHandle t; (t = waitingThreads.poll()) != null;) { + ConcurrentLinkedQueue threadqueue = waitingThreads; + for (WaiterThreadHandle t; (t = threadqueue.poll()) != null;) { LockSupport.unpark(t.get()); } break; @@ -3958,10 +3983,20 @@ protected void unparkWaitingThreads(TaskExecutionManager execmanager, int event) } } - protected void unparkAllWaitingThreads(TaskExecutionManager execmanager) { - ConcurrentLinkedQueue threadqueue = waitingThreads; - for (WaiterThreadHandle t; (t = threadqueue.poll()) != null;) { - t.unparkNotify(execmanager); + protected void unparkWaitingThreadsForResult(TaskExecutionManager execmanager) { + for (Iterator it = waitingThreads.iterator(); it.hasNext();) { + WaiterThreadHandle t = it.next(); + int triggerevents = t.triggerEvents; + if ((triggerevents & STATE_RESULT_READY) == 0) { + //the thread is not interested in the RESULT_READY event + continue; + } + if (!t.unparkNotify(execmanager) || triggerevents == STATE_RESULT_READY) { + //thread handle finished + // OR + //only interested in the RESULT_READY event, so it can be removed + it.remove(); + } } } @@ -3969,7 +4004,7 @@ protected boolean unparkOneWaitingThread(TaskExecutionManager execmanager) { for (Iterator it = waitingThreads.iterator(); it.hasNext();) { WaiterThreadHandle t = it.next(); if (!t.unparkNotify(execmanager)) { - //thread handle finished + //thread handle finished, continue attempting to unpark the next one it.remove(); } else { return true; @@ -6376,10 +6411,6 @@ private void executeTaskRunning(TaskExecutionResult previousExecutionResu // throw exc; // } } - if (TestFlag.ENABLED) { - TestFlag.metric().taskFinished(taskid, factory, result, executionresult.getTaggedOutputs(), - executionresult.getMetaDatas()); - } executiondependencies.setSelfOutputChangeDetector(taskcontext.reportedOutputChangeDetector); @@ -6393,6 +6424,12 @@ private void executeTaskRunning(TaskExecutionResult previousExecutionResu future.finished(this, executionresult); + if (TestFlag.ENABLED) { + //call this after the future.finished() call + TestFlag.metric().taskFinished(taskid, factory, result, executionresult.getTaggedOutputs(), + executionresult.getMetaDatas()); + } + if (hasabortedexception) { taskRunningFailureExceptions.add(ImmutableUtils.makeImmutableMapEntry(taskid, createFailException(taskid, taskrunningexception, abortexceptions))); diff --git a/test/tests/src/testing/saker/build/tests/tasks/AddAncestorBlockingWaitTaskTest.java b/test/tests/src/testing/saker/build/tests/tasks/AddAncestorBlockingWaitTaskTest.java index d3bc95dc..cedabc5e 100644 --- a/test/tests/src/testing/saker/build/tests/tasks/AddAncestorBlockingWaitTaskTest.java +++ b/test/tests/src/testing/saker/build/tests/tasks/AddAncestorBlockingWaitTaskTest.java @@ -15,11 +15,15 @@ */ package testing.saker.build.tests.tasks; +import java.io.IOException; + import saker.build.task.TaskContext; import saker.build.task.TaskFactory; import saker.build.task.TaskFuture; import testing.saker.SakerTest; +import testing.saker.build.flag.TestFlag; import testing.saker.build.tests.CollectingMetricEnvironmentTestCase; +import testing.saker.build.tests.CollectingTestMetric; import testing.saker.build.tests.ExecutionOrderer; import testing.saker.build.tests.tasks.factories.ChildTaskStarterTaskFactory; import testing.saker.build.tests.tasks.factories.StringTaskFactory; @@ -65,7 +69,7 @@ public class AddAncestorBlockingWaitTaskTest extends CollectingMetricEnvironment /** * The plus task has finished, and its result has been waited for by main. */ - private static final String SECTION_PLUS_FINISHED = "plus_started"; + private static final String SECTION_PLUS_FINISHED = "plus_finished"; /** * The str task has ben waited for by waiter. */ @@ -78,6 +82,8 @@ public class AddAncestorBlockingWaitTaskTest extends CollectingMetricEnvironment private static ExecutionOrderer orderer; private static volatile boolean gotStrTaskResultByWaiter = false; + private static volatile boolean waitStrFinishInStarter = false; + private static class StarterTaskFactory extends SelfStatelessTaskFactory { private static final long serialVersionUID = 1L; @@ -86,8 +92,24 @@ public Void run(TaskContext taskcontext) throws Exception { taskcontext.getTaskUtilities().startTaskFuture(strTaskId("waiter"), new WaiterTaskFactory()); taskcontext.getTaskUtilities().startTaskFuture(strTaskId("blocker"), new BlockerStarterTaskFactory()); orderer.enter(SECTION_PLUS_STARTER); - taskcontext.getTaskUtilities().runTaskResult(strTaskId("plus"), - new ChildTaskStarterTaskFactory().add(strTaskId("str"), new StringTaskFactory("str"))); + ChildTaskStarterTaskFactory childstarter = new ChildTaskStarterTaskFactory() { + @Override + public Void run(TaskContext context) throws Exception { + if (waitStrFinishInStarter) { + System.out.println("Wait result of str task before starting it..."); + while (!((CollectingTestMetric) TestFlag.metric()).getRunTaskIdResults() + .containsKey(strTaskId("str"))) { + Thread.sleep(100); + } + + System.out.println("Got result of str through test metric."); + } + return super.run(context); + } + }; + childstarter.add(strTaskId("str"), new StringTaskFactory("str")); + + taskcontext.getTaskUtilities().runTaskResult(strTaskId("plus"), childstarter); orderer.enter(SECTION_PLUS_FINISHED); return null; } @@ -126,6 +148,29 @@ public Void run(TaskContext taskcontext) throws Exception { @Override protected void runTestImpl() throws Throwable { + for (int i = 0; i < 10; i++) { + waitStrFinishInStarter = false; + runMainTask(); + cleanProject(); + System.out.println(); + + System.out.println("Wait str:"); + waitStrFinishInStarter = true; + runMainTask(); + cleanProject(); + System.out.println(); + } + } + + private void cleanProject() throws IOException { + if (project != null) { + project.clean(); + } else { + files.clearDirectoryRecursively(PATH_BUILD_DIRECTORY); + } + } + + private void runMainTask() throws Throwable, AssertionError { gotStrTaskResultByWaiter = false; ExecutionOrderer orderer = new ExecutionOrderer(); orderer.addSection(SECTION_WAITER_START); @@ -139,6 +184,8 @@ protected void runTestImpl() throws Throwable { AddAncestorBlockingWaitTaskTest.orderer = new ExecutionOrderer(orderer); runTask("main", main); + assertEquals(getMetric().getRunTaskIdFactories().keySet(), + strTaskIdSetOf("main", "blocker", "str", "waiter", "plus")); } } diff --git a/test/tests/src/testing/saker/build/tests/tasks/factories/ChildTaskStarterTaskFactory.java b/test/tests/src/testing/saker/build/tests/tasks/factories/ChildTaskStarterTaskFactory.java index e6682595..ed402e6d 100644 --- a/test/tests/src/testing/saker/build/tests/tasks/factories/ChildTaskStarterTaskFactory.java +++ b/test/tests/src/testing/saker/build/tests/tasks/factories/ChildTaskStarterTaskFactory.java @@ -30,7 +30,7 @@ import saker.build.task.identifier.TaskIdentifier; import saker.build.thirdparty.saker.util.io.SerialUtils; -public class ChildTaskStarterTaskFactory implements TaskFactory, Externalizable { +public class ChildTaskStarterTaskFactory implements TaskFactory, Task, Externalizable { private static final long serialVersionUID = 1L; private Map> namedChildTaskValues = new HashMap<>(); @@ -64,16 +64,15 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept @Override public Task createTask(ExecutionContext context) { - return new Task() { - @Override - public Void run(TaskContext context) { - for (Entry> entry : namedChildTaskValues - .entrySet()) { - context.getTaskUtilities().startTaskFuture(entry.getKey(), entry.getValue()); - } - return null; - } - }; + return this; + } + + @Override + public Void run(TaskContext context) throws Exception { + for (Entry> entry : namedChildTaskValues.entrySet()) { + context.getTaskUtilities().startTaskFuture(entry.getKey(), entry.getValue()); + } + return null; } @Override diff --git a/test/utils/src/testing/saker/build/tests/ExecutionOrderer.java b/test/utils/src/testing/saker/build/tests/ExecutionOrderer.java index 4f210f8a..522aeaa3 100644 --- a/test/utils/src/testing/saker/build/tests/ExecutionOrderer.java +++ b/test/utils/src/testing/saker/build/tests/ExecutionOrderer.java @@ -15,6 +15,8 @@ */ package testing.saker.build.tests; +import java.time.Instant; +import java.time.format.DateTimeFormatter; import java.util.LinkedList; import java.util.Objects; @@ -39,24 +41,34 @@ public void addSection(String id) { public synchronized void enter(String id) throws InterruptedException { try { + if (Thread.interrupted()) { + //check interruption before entering + //so if the thread is already interrupted when this method is called, then + //we throw an exception and dont consume a section (so errors are logged more appropriately.) + throw new InterruptedException(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + + " Interrupted while waiting for: " + id + " in " + order); + } while (true) { String first = order.peekFirst(); if (first == null) { throw new IllegalArgumentException("No more sections."); } if (first.equals(id)) { - System.out.println("Reached: " + id); + System.out.println( + DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + " ExecutionOrderer reached: " + id); order.pollFirst(); this.notifyAll(); return; } if (!order.contains(id)) { - throw new IllegalArgumentException("No section found: " + id + " in " + order); + throw new IllegalArgumentException(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + + " No section found: " + id + " in " + order); } this.wait(); } } catch (InterruptedException e) { - throw new InterruptedException("Interrupted while waiting for: " + id + " in " + order); + throw new InterruptedException(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + + " Interrupted while waiting for: " + id + " in " + order); } } @@ -70,7 +82,11 @@ public boolean isAnySectionRemaining() { @Override public String toString() { - return "ExecutionOrderer[" + order + "]"; + String orderstr; + synchronized (this) { + orderstr = order.toString(); + } + return getClass().getSimpleName() + "[" + orderstr + "]"; } }