Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors workflows engine loop #14375

Merged
merged 41 commits into from
Oct 7, 2024

Conversation

vyzaldysanchez
Copy link
Contributor

Requires Dependencies

Resolves Dependencies

@vyzaldysanchez vyzaldysanchez marked this pull request as ready for review September 11, 2024 19:22
@vyzaldysanchez vyzaldysanchez requested a review from a team as a code owner September 11, 2024 19:22
jmank88
jmank88 previously approved these changes Sep 11, 2024
// Executed synchronously to ensure we correctly schedule subsequent tasks.
err := e.handleStepUpdate(ctx, stepUpdate)
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be eIDKey instead of a key for step update execution IDs?

})
if !added {
// skip this execution since there's already a stepUpdateLoop running for the execution ID
e.logger.With(eIDKey, executionID).Debugf("won't start execution for execution %s, execution was already started", executionID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/nit call With once at line 510?

lggr := e.logger.With("event", event, eIDKey, executionID)

return err
}
for _, sd := range stepDependents {
e.queueIfReady(state, sd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it cleaner wrt to separation of concerns if handleStepUpdate isn't responsible for queue'ing dependents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to queue is really part of handling the update here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't they separate concerns? If there was a parent orchestrator that called handleStepUpdate and then processed the dependents, that wouldn't be the case.

func (e *Engine) isWorkflowFullyProcessed(state store.WorkflowExecution) (bool, string, error) {
statuses := map[string]string{}
// we need to first propagate the status of the errored status if it exists...
err := e.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error {
Copy link
Contributor

@patrickhuie19 patrickhuie19 Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these funcs passed into walkDo need to be anonymous? unit testing isWorkflowFullyProcessed seems like it could be cleaner if you could call into these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they can be named, I see no issues there.


var hasErrored, hasTimedOut, hasCompletedEarlyExit bool
// Let's determine the status of the workflow.
for _, status := range statuses {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't read into walkDo suggest, but its name suggests that its walking the DAG of the workflow steps for a provided func. In this case, is the error status we present to the workflow executor the first errored status the walker finds (b/c that status is what is propagated to all the dependends of an error'd step)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We consider 3 states other than completed, to consider the workflow processed: error, completed_early_exit and timeout.

The way we return the error to the executor is basically as follows: if there's a single error, then it is considered error, if there's a single timeout, then it is timedout, and if none of the others, if there's a completed_early_exit, then it is considered as such.

The precedence of preference is error -> timeout -> completed_early_exit.

Not sure if this answers your question clearly, please let me know @patrickhuie19.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To restate, these states are mutually exclusive and propagated to their dependents. If we traverse the tree in the order of executions we could return the first non-complete status we see, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants