From 049722c7793cb9626d18d68e3b3aa82d6b042b95 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 3 Jun 2024 14:36:19 -0500 Subject: [PATCH 1/3] Fix wait condition evaluation issue --- src/Temporalio/Worker/WorkflowInstance.cs | 20 ++++++--- .../Worker/WorkflowWorkerTests.cs | 44 +++++++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index b355260c..12540b07 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -653,19 +653,22 @@ private void RunOnce(bool checkConditions) // Run all tasks until empty RunAllTasks(); - // If there are any conditions, schedule a new condition-check task and the run all - // tasks again. This is a scheduled task instead of just run inline because it needs - // to be in the workflow context (i.e. current task scheduler) so the `Workflow` - // methods work properly. + // If there are any conditions, run the condition-check as a task. This is a task + // instead of just run inline because it needs to be in the workflow context (i.e. + // current task scheduler) so the `Workflow` methods work properly. However, we make + // sure that we only run this task because we want the loop to continue with other + // scheduled tasks that the conditions caused to wake up. An original, naive form of + // this ran all tasks including conditions, but that didn't allow conditions that + // depended on each other to properly re-schedule earlier conditions. if (checkConditions && conditions.Count > 0) { _ = QueueNewTaskAsync(CheckConditionsAsync); - RunAllTasks(); + RunAllTasks(singleTaskOnly: true); } } } - private void RunAllTasks() + private void RunAllTasks(bool singleTaskOnly = false) { while (scheduledTasks.Count > 0) { @@ -683,6 +686,11 @@ private void RunAllTasks() { ExceptionDispatchInfo.Capture(currentActivationException).Throw(); } + // We return on single-task-only regardless of whether there are more tasks + if (singleTaskOnly) + { + return; + } } } diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 3f92b15f..08e31731 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4543,6 +4543,50 @@ await ExecuteWorkerAsync( new TemporalWorkerOptions().AddActivity(NextRetryDelayWorkflow.NextRetryDelayActivity)); } + [Workflow] + public class ConditionBounceWorkflow + { + private int workflowCounter; + private int signalCounter; + + [WorkflowRun] + public async Task RunAsync() + { + while (workflowCounter < 5) + { + var counterBefore = workflowCounter; + await Workflow.WaitConditionAsync(() => workflowCounter > counterBefore); + signalCounter++; + } + } + + [WorkflowSignal] + public async Task DoSignalAsync() + { + while (signalCounter < 5) + { + workflowCounter++; + var counterBefore = signalCounter; + await Workflow.WaitConditionAsync(() => signalCounter > counterBefore); + } + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_ConditionBounce_ProperlyReschedules() + { + await ExecuteWorkerAsync( + async worker => + { + var handle = await Env.Client.StartWorkflowAsync( + (ConditionBounceWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + await AssertHasEventEventuallyAsync(handle, evt => evt.WorkflowTaskCompletedEventAttributes != null); + await handle.SignalAsync(wf => wf.DoSignalAsync()); + await handle.GetResultAsync(); + }); + } + internal static Task AssertTaskFailureContainsEventuallyAsync( WorkflowHandle handle, string messageContains) { From 99075275785a92737b174ff3f0886e7c0da2aaae Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 3 Jun 2024 16:38:17 -0500 Subject: [PATCH 2/3] Change approach to effectively revert #242 and use AsyncLocal --- src/Temporalio/Worker/WorkflowInstance.cs | 85 ++++++++++------------- src/Temporalio/Workflows/Workflow.cs | 13 +++- 2 files changed, 47 insertions(+), 51 deletions(-) diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index 12540b07..e7f38089 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -651,63 +651,48 @@ private void RunOnce(bool checkConditions) while (scheduledTasks.Count > 0) { // Run all tasks until empty - RunAllTasks(); - - // If there are any conditions, run the condition-check as a task. This is a task - // instead of just run inline because it needs to be in the workflow context (i.e. - // current task scheduler) so the `Workflow` methods work properly. However, we make - // sure that we only run this task because we want the loop to continue with other - // scheduled tasks that the conditions caused to wake up. An original, naive form of - // this ran all tasks including conditions, but that didn't allow conditions that - // depended on each other to properly re-schedule earlier conditions. - if (checkConditions && conditions.Count > 0) + while (scheduledTasks.Count > 0) { - _ = QueueNewTaskAsync(CheckConditionsAsync); - RunAllTasks(singleTaskOnly: true); - } - } - } - - private void RunAllTasks(bool singleTaskOnly = false) - { - while (scheduledTasks.Count > 0) - { - // Pop last - var task = scheduledTasks.Last!.Value; - scheduledTasks.RemoveLast(); - scheduledTaskNodes.Remove(task); + // Pop last + var task = scheduledTasks.Last!.Value; + scheduledTasks.RemoveLast(); + scheduledTaskNodes.Remove(task); - // This should never return false - if (!TryExecuteTask(task)) - { - logger.LogWarning("Task unexpectedly was unable to execute"); - } - if (currentActivationException != null) - { - ExceptionDispatchInfo.Capture(currentActivationException).Throw(); - } - // We return on single-task-only regardless of whether there are more tasks - if (singleTaskOnly) - { - return; + // This should never return false + if (!TryExecuteTask(task)) + { + logger.LogWarning("Task unexpectedly was unable to execute"); + } + if (currentActivationException != null) + { + ExceptionDispatchInfo.Capture(currentActivationException).Throw(); + } } - } - } - private Task CheckConditionsAsync() - { - try - { - foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) + // Check conditions. It would be nice if we could run this in the task scheduler + // because then users could have access to the `Workflow` context in the condition + // callback. However, this cannot be done because even just running one task in the + // scheduler causes .NET to add more tasks to the scheduler. And you don't want to + // "run until empty" with the condition, because conditions may need to be retried + // based on each other. This sounds confusing but basically: can't run check + // conditions in the task scheduler comfortably but still need to access the static + // Workflow class, hence the context override. + if (checkConditions && conditions.Count > 0) { - source.TrySetResult(null); + Workflow.OverrideContext.Value = this; + try + { + foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2)) + { + source.TrySetResult(null); + } + } + finally + { + Workflow.OverrideContext.Value = null; + } } } - catch (Exception e) - { - currentActivationException = e; - } - return Task.CompletedTask; } private void AddCommand(WorkflowCommand cmd) diff --git a/src/Temporalio/Workflows/Workflow.cs b/src/Temporalio/Workflows/Workflow.cs index b56d09d3..a52edb16 100644 --- a/src/Temporalio/Workflows/Workflow.cs +++ b/src/Temporalio/Workflows/Workflow.cs @@ -196,8 +196,19 @@ public static WorkflowUpdateDefinition? DynamicUpdate /// public static DateTime UtcNow => Context.UtcNow; + /// + /// Gets an async local to override the context. + /// + /// + /// This was only made available so WaitConditionAsync callbacks could have access to the + /// workflow context without running inside the task scheduler. + /// + internal static AsyncLocal OverrideContext { get; } = new(); + private static IWorkflowContext Context => - TaskScheduler.Current as IWorkflowContext ?? throw new InvalidOperationException("Not in workflow"); + TaskScheduler.Current as IWorkflowContext ?? + OverrideContext.Value ?? + throw new InvalidOperationException("Not in workflow"); /// /// Create an exception via lambda invoking the run method that, when thrown out of the From 3924bdaebb581e37ffb7dc64538af62459ffb938 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 4 Jun 2024 08:25:07 -0500 Subject: [PATCH 3/3] Update assertion call --- tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index b867f40f..83091554 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -4580,7 +4580,8 @@ await ExecuteWorkerAsync( var handle = await Env.Client.StartWorkflowAsync( (ConditionBounceWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); - await AssertHasEventEventuallyAsync(handle, evt => evt.WorkflowTaskCompletedEventAttributes != null); + await AssertMore.HasEventEventuallyAsync( + handle, evt => evt.WorkflowTaskCompletedEventAttributes != null); await handle.SignalAsync(wf => wf.DoSignalAsync()); await handle.GetResultAsync(); });