diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index 7c3ba210..b355260c 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -648,38 +648,58 @@ protected override bool TryDequeue(Task task) private void RunOnce(bool checkConditions) { // Run as long as we have scheduled tasks - // TODO(cretz): Fix to run as long as any tasks not yielded on Temporal while (scheduledTasks.Count > 0) { - while (scheduledTasks.Count > 0) + // Run all tasks until empty + RunAllTasks(); + + // If there are any conditions, schedule a new condition-check task and the run all + // tasks again. This is a scheduled task instead of just run inline because it needs + // to be in the workflow context (i.e. current task scheduler) so the `Workflow` + // methods work properly. + if (checkConditions && conditions.Count > 0) { - // Pop last - var task = scheduledTasks.Last!.Value; - scheduledTasks.RemoveLast(); - scheduledTaskNodes.Remove(task); + _ = QueueNewTaskAsync(CheckConditionsAsync); + RunAllTasks(); + } + } + } - // This should never return false - if (!TryExecuteTask(task)) - { - logger.LogWarning("Task unexpectedly was unable to execute"); - } - if (currentActivationException != null) - { - ExceptionDispatchInfo.Capture(currentActivationException).Throw(); - } + private void RunAllTasks() + { + while (scheduledTasks.Count > 0) + { + // Pop last + var task = scheduledTasks.Last!.Value; + scheduledTasks.RemoveLast(); + scheduledTaskNodes.Remove(task); + + // This should never return false + if (!TryExecuteTask(task)) + { + logger.LogWarning("Task unexpectedly was unable to execute"); } + if (currentActivationException != null) + { + ExceptionDispatchInfo.Capture(currentActivationException).Throw(); + } + } + } - // Collect all condition sources to mark complete and then complete them. This - // avoids modify-during-iterate issues. - if (checkConditions) + private Task CheckConditionsAsync() + { + try + { + foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) { - var completeConditions = conditions.Where(tuple => tuple.Item1()); - foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) - { - source.TrySetResult(null); - } + source.TrySetResult(null); } } + catch (Exception e) + { + currentActivationException = e; + } + return Task.CompletedTask; } private void AddCommand(WorkflowCommand cmd) diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index abacdd26..18c66cdb 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4373,6 +4373,45 @@ await ExecuteWorkerAsync( }); } + [Workflow] + public class CallWorkflowInWaitConditionWorkflow + { + [WorkflowRun] + public async Task RunAsync() => await Workflow.WaitConditionAsync( + () => !string.IsNullOrEmpty(Workflow.Info.WorkflowId)); + } + + [Fact] + public async Task ExecuteWorkflowAsync_WaitConditionCallingWorkflow_WorksProperly() + { + await ExecuteWorkerAsync(async worker => + { + await Client.ExecuteWorkflowAsync( + (CallWorkflowInWaitConditionWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + }); + } + + [Workflow] + public class WaitConditionExceptionWorkflow + { + [WorkflowRun] + public async Task RunAsync() => await Workflow.WaitConditionAsync( + () => throw new ApplicationFailureException("Intentional error")); + } + + [Fact] + public async Task ExecuteWorkflowAsync_WaitConditionExceptionWorkflow_WorksProperly() + { + await ExecuteWorkerAsync(async worker => + { + var handle = await Client.StartWorkflowAsync( + (WaitConditionExceptionWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional error"); + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) {