Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait condition evaluation issue #259

Merged
merged 4 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 35 additions & 42 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -651,55 +651,48 @@ private void RunOnce(bool checkConditions)
while (scheduledTasks.Count > 0)
{
// Run all tasks until empty
RunAllTasks();

// If there are any conditions, schedule a new condition-check task and the run all
// tasks again. This is a scheduled task instead of just run inline because it needs
// to be in the workflow context (i.e. current task scheduler) so the `Workflow`
// methods work properly.
if (checkConditions && conditions.Count > 0)
while (scheduledTasks.Count > 0)
{
_ = QueueNewTaskAsync(CheckConditionsAsync);
RunAllTasks();
}
}
}

private void RunAllTasks()
{
while (scheduledTasks.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
}
}
}

private Task CheckConditionsAsync()
{
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
// Check conditions. It would be nice if we could run this in the task scheduler
// because then users could have access to the `Workflow` context in the condition
// callback. However, this cannot be done because even just running one task in the
// scheduler causes .NET to add more tasks to the scheduler. And you don't want to

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other tasks is dotnet adding ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looked at stack trace, it seems like this is actually in my stack trace logic at

return task.ContinueWith(
(and just below it for the return-type-based one) which is occurring during stack trace test (stack trace disabled by default in .NET). So I could technically disable stack trace continuations during condition checking, but I think the behavior here in the PR of not pushing the condition checks into the task queue is the best anyways.

// "run until empty" with the condition, because conditions may need to be retried
// based on each other. This sounds confusing but basically: can't run check
// conditions in the task scheduler comfortably but still need to access the static
// Workflow class, hence the context override.
if (checkConditions && conditions.Count > 0)
{
source.TrySetResult(null);
Workflow.OverrideContext.Value = this;
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
source.TrySetResult(null);
}
}
finally
{
Workflow.OverrideContext.Value = null;
}
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
}

private void AddCommand(WorkflowCommand cmd)
Expand Down
13 changes: 12 additions & 1 deletion src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,19 @@ public static WorkflowUpdateDefinition? DynamicUpdate
/// </remarks>
public static DateTime UtcNow => Context.UtcNow;

/// <summary>
/// Gets an async local to override the context.
/// </summary>
/// <remarks>
/// This was only made available so WaitConditionAsync callbacks could have access to the
/// workflow context without running inside the task scheduler.
/// </remarks>
internal static AsyncLocal<IWorkflowContext?> OverrideContext { get; } = new();

private static IWorkflowContext Context =>
TaskScheduler.Current as IWorkflowContext ?? throw new InvalidOperationException("Not in workflow");
TaskScheduler.Current as IWorkflowContext ??
OverrideContext.Value ??
throw new InvalidOperationException("Not in workflow");

/// <summary>
/// Create an exception via lambda invoking the run method that, when thrown out of the
Expand Down
45 changes: 45 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4542,6 +4542,51 @@ await ExecuteWorkerAsync<NextRetryDelayWorkflow>(
new TemporalWorkerOptions().AddActivity(NextRetryDelayWorkflow.NextRetryDelayActivity));
}

[Workflow]
public class ConditionBounceWorkflow
{
private int workflowCounter;
private int signalCounter;

[WorkflowRun]
public async Task RunAsync()
{
while (workflowCounter < 5)
{
var counterBefore = workflowCounter;
await Workflow.WaitConditionAsync(() => workflowCounter > counterBefore);
signalCounter++;
}
}

[WorkflowSignal]
public async Task DoSignalAsync()
{
while (signalCounter < 5)
{
workflowCounter++;
var counterBefore = signalCounter;
await Workflow.WaitConditionAsync(() => signalCounter > counterBefore);
}
}
}

[Fact]
public async Task ExecuteWorkflowAsync_ConditionBounce_ProperlyReschedules()
{
await ExecuteWorkerAsync<ConditionBounceWorkflow>(
async worker =>
{
var handle = await Env.Client.StartWorkflowAsync(
(ConditionBounceWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await AssertMore.HasEventEventuallyAsync(
handle, evt => evt.WorkflowTaskCompletedEventAttributes != null);
await handle.SignalAsync(wf => wf.DoSignalAsync());
await handle.GetResultAsync();
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down
Loading