Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run wait-condition callbacks in workflow context #242

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 43 additions & 23 deletions src/Temporalio/Worker/WorkflowInstance.cs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So git diff makes these changes look a bit scarier than they are. We just moved the run-all-task loop into a separate method, then enqueue the condition checker task instead of invoking it inline (issuing a run-all-task again for that new task).

Original file line number Diff line number Diff line change
Expand Up @@ -648,38 +648,58 @@ protected override bool TryDequeue(Task task)
private void RunOnce(bool checkConditions)
{
// Run as long as we have scheduled tasks
// TODO(cretz): Fix to run as long as any tasks not yielded on Temporal
while (scheduledTasks.Count > 0)
{
while (scheduledTasks.Count > 0)
// Run all tasks until empty
RunAllTasks();

// If there are any conditions, schedule a new condition-check task and the run all
// tasks again. This is a scheduled task instead of just run inline because it needs
// to be in the workflow context (i.e. current task scheduler) so the `Workflow`
// methods work properly.
if (checkConditions && conditions.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);
_ = QueueNewTaskAsync(CheckConditionsAsync);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding in dotnet, Do we detect if the user tries to call any illegal APIs in the wait condition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't and we probably should. We have been mostly counting on the fact that it's not async to prevent commands, but there are commands like upsert search attribute that are not async. This is actually a gap for queries and update validators too. I have opened #243.

RunAllTasks();
}
}
}

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
private void RunAllTasks()
{
while (scheduledTasks.Count > 0)
{
// Pop last
var task = scheduledTasks.Last!.Value;
scheduledTasks.RemoveLast();
scheduledTaskNodes.Remove(task);

// This should never return false
if (!TryExecuteTask(task))
{
logger.LogWarning("Task unexpectedly was unable to execute");
}
if (currentActivationException != null)
{
ExceptionDispatchInfo.Capture(currentActivationException).Throw();
}
}
}

// Collect all condition sources to mark complete and then complete them. This
// avoids modify-during-iterate issues.
if (checkConditions)
private Task CheckConditionsAsync()
{
try
{
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
var completeConditions = conditions.Where(tuple => tuple.Item1());
foreach (var source in conditions.Where(t => t.Item1()).Select(t => t.Item2))
{
source.TrySetResult(null);
}
source.TrySetResult(null);
}
}
catch (Exception e)
{
currentActivationException = e;
}
return Task.CompletedTask;
}

private void AddCommand(WorkflowCommand cmd)
Expand Down
39 changes: 39 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4373,6 +4373,45 @@ await ExecuteWorkerAsync<TickingWorkflow>(
});
}

[Workflow]
public class CallWorkflowInWaitConditionWorkflow
{
[WorkflowRun]
public async Task RunAsync() => await Workflow.WaitConditionAsync(
() => !string.IsNullOrEmpty(Workflow.Info.WorkflowId));
}

[Fact]
public async Task ExecuteWorkflowAsync_WaitConditionCallingWorkflow_WorksProperly()
{
await ExecuteWorkerAsync<CallWorkflowInWaitConditionWorkflow>(async worker =>
{
await Client.ExecuteWorkflowAsync(
(CallWorkflowInWaitConditionWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
});
}

[Workflow]
public class WaitConditionExceptionWorkflow
{
[WorkflowRun]
public async Task RunAsync() => await Workflow.WaitConditionAsync(
() => throw new ApplicationFailureException("Intentional error"));
}

[Fact]
public async Task ExecuteWorkflowAsync_WaitConditionExceptionWorkflow_WorksProperly()
{
await ExecuteWorkerAsync<WaitConditionExceptionWorkflow>(async worker =>
{
var handle = await Client.StartWorkflowAsync(
(WaitConditionExceptionWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional error");
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down
Loading