diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index b355260c..e7f38089 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -651,55 +651,48 @@ private void RunOnce(bool checkConditions) while (scheduledTasks.Count > 0) { // Run all tasks until empty - RunAllTasks(); - - // If there are any conditions, schedule a new condition-check task and the run all - // tasks again. This is a scheduled task instead of just run inline because it needs - // to be in the workflow context (i.e. current task scheduler) so the `Workflow` - // methods work properly. - if (checkConditions && conditions.Count > 0) + while (scheduledTasks.Count > 0) { - _ = QueueNewTaskAsync(CheckConditionsAsync); - RunAllTasks(); - } - } - } - - private void RunAllTasks() - { - while (scheduledTasks.Count > 0) - { - // Pop last - var task = scheduledTasks.Last!.Value; - scheduledTasks.RemoveLast(); - scheduledTaskNodes.Remove(task); + // Pop last + var task = scheduledTasks.Last!.Value; + scheduledTasks.RemoveLast(); + scheduledTaskNodes.Remove(task); - // This should never return false - if (!TryExecuteTask(task)) - { - logger.LogWarning("Task unexpectedly was unable to execute"); - } - if (currentActivationException != null) - { - ExceptionDispatchInfo.Capture(currentActivationException).Throw(); + // This should never return false + if (!TryExecuteTask(task)) + { + logger.LogWarning("Task unexpectedly was unable to execute"); + } + if (currentActivationException != null) + { + ExceptionDispatchInfo.Capture(currentActivationException).Throw(); + } } - } - } - private Task CheckConditionsAsync() - { - try - { - foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) + // Check conditions. It would be nice if we could run this in the task scheduler + // because then users could have access to the `Workflow` context in the condition + // callback. However, this cannot be done because even just running one task in the + // scheduler causes .NET to add more tasks to the scheduler. And you don't want to + // "run until empty" with the condition, because conditions may need to be retried + // based on each other. This sounds confusing but basically: can't run check + // conditions in the task scheduler comfortably but still need to access the static + // Workflow class, hence the context override. + if (checkConditions && conditions.Count > 0) { - source.TrySetResult(null); + Workflow.OverrideContext.Value = this; + try + { + foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) + { + source.TrySetResult(null); + } + } + finally + { + Workflow.OverrideContext.Value = null; + } } } - catch (Exception e) - { - currentActivationException = e; - } - return Task.CompletedTask; } private void AddCommand(WorkflowCommand cmd) diff --git a/src/Temporalio/Workflows/Workflow.cs b/src/Temporalio/Workflows/Workflow.cs index b56d09d3..a52edb16 100644 --- a/src/Temporalio/Workflows/Workflow.cs +++ b/src/Temporalio/Workflows/Workflow.cs @@ -196,8 +196,19 @@ public static WorkflowUpdateDefinition? DynamicUpdate /// public static DateTime UtcNow => Context.UtcNow; + /// + /// Gets an async local to override the context. + /// + /// + /// This was only made available so WaitConditionAsync callbacks could have access to the + /// workflow context without running inside the task scheduler. + /// + internal static AsyncLocal OverrideContext { get; } = new(); + private static IWorkflowContext Context => - TaskScheduler.Current as IWorkflowContext ?? throw new InvalidOperationException("Not in workflow"); + TaskScheduler.Current as IWorkflowContext ?? + OverrideContext.Value ?? + throw new InvalidOperationException("Not in workflow"); /// /// Create an exception via lambda invoking the run method that, when thrown out of the diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 4e1a32de..83091554 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4542,6 +4542,51 @@ await ExecuteWorkerAsync( new TemporalWorkerOptions().AddActivity(NextRetryDelayWorkflow.NextRetryDelayActivity)); } + [Workflow] + public class ConditionBounceWorkflow + { + private int workflowCounter; + private int signalCounter; + + [WorkflowRun] + public async Task RunAsync() + { + while (workflowCounter < 5) + { + var counterBefore = workflowCounter; + await Workflow.WaitConditionAsync(() => workflowCounter > counterBefore); + signalCounter++; + } + } + + [WorkflowSignal] + public async Task DoSignalAsync() + { + while (signalCounter < 5) + { + workflowCounter++; + var counterBefore = signalCounter; + await Workflow.WaitConditionAsync(() => signalCounter > counterBefore); + } + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_ConditionBounce_ProperlyReschedules() + { + await ExecuteWorkerAsync( + async worker => + { + var handle = await Env.Client.StartWorkflowAsync( + (ConditionBounceWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + await AssertMore.HasEventEventuallyAsync( + handle, evt => evt.WorkflowTaskCompletedEventAttributes != null); + await handle.SignalAsync(wf => wf.DoSignalAsync()); + await handle.GetResultAsync(); + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) {