Skip to content

Commit

Permalink
add some comments for clarification
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleader committed Dec 6, 2023
1 parent 54a73f3 commit f8d82c8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
46 changes: 28 additions & 18 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,36 +98,38 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
return nil, fmt.Errorf("failed to add input node (%w)", err)
}

// First we unserialize the input schema
// Stage 1: Unserialize the input schema.
typedInput, err := e.processInput(workflow)
if err != nil {
return nil, err
}

// Then we process the steps. This involves several sub-steps, make sure to check the function.
// Stage 2: Process the steps. This involves several sub-steps, make sure
// to check the function.
runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext)
if err != nil {
return nil, err
}

// Now we can construct an internal data model for the output data model provided by the steps. This is the schema
// the expressions evaluate against. You can use this to do static code analysis on the expressions.
// Stage 3: Construct an internal data model for the output data model
// provided by the steps. This is the schema the expressions evaluate
// against. You can use this to do static code analysis on the expressions.
internalDataModel := e.buildInternalDataModel(typedInput, stepOutputProperties)

// Stage 4: build the DAG dependencies.
// Stage 4: Build the DAG dependencies.
if err := e.connectStepDependencies(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}

// Stage 5: Verify stage inputs
// Now that the output properties are here and the internal data model is here, it should be possible to loop
// through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas
// are valid, etc.
// Do this by looping through the steps' inputs, then verifying that the dag can provide them.
if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
// Stage 5: Classify stage inputs.
// Use workflow steps, life cycles, and DAG, as an input (string) into a
// finite state machine Classifier. The input will either be
// accepted (nil), or it will be rejected with one of the error states.
if err := e.classifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}
// Stage 6: The output data model

// Stage 6: Output data model.
//goland:noinspection GoDeprecation
if workflow.Output != nil {
if len(workflow.Outputs) > 0 {
Expand Down Expand Up @@ -171,8 +173,10 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
}
}

// We don't like cycles as we can't execute them properly. Maybe we can improve this later to actually output the
// cycle to help the user?
// Stage 7: Check DAG acyclicity.
// We don't like cycles as we can't execute them properly.
// Maybe we can improve this later to actually output the cycle to
// help the user?
if dag.HasCycles() {
return nil, fmt.Errorf("your workflow has a cycle")
}
Expand Down Expand Up @@ -329,15 +333,19 @@ func (e *executor) connectStepDependencies(
return nil
}

// verifyWorkflowStageInputs verifies the schemas of the step inputs.
func (e *executor) verifyWorkflowStageInputs(
// classifyWorkflowStageInputs uses workflow steps, life cycles, and DAG,
// as an input (string) into a finite state machine Classifier.
// The function returns whether the input was accepted (nil),
// or rejected with one of the error states.
func (e *executor) classifyWorkflowStageInputs(
workflow *Workflow,
workflowContext map[string][]byte,
stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema],
dag dgraph.DirectedGraph[*DAGItem],
internalDataModel *schema.ScopeSchema,
) error {
// Loop through all the steps in the engine
// Looping through the steps' inputs, then verify that the
// dag can provide them.
for stepID /*stepData is the unused key*/ := range workflow.Steps {
// Then loop through the stages of that step
lifecycle := stepLifecycles[stepID]
Expand Down Expand Up @@ -553,8 +561,8 @@ func (e *executor) buildOutputProperties(

stageOutputsLen := len(stage.Outputs)

// only add stages with outputs to the DAG
if stageOutputsLen > 0 {
// only add stages with outputs
stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen)
stageOutputs, err2 := e.addOutputProperties(
stage, stepID, runnableStep, dag, stepNode,
Expand All @@ -577,6 +585,8 @@ func (e *executor) buildOutputProperties(
return outputProperties, nil
}

// addOutputProperties adds a step's stage's output properties
// to the given DAG.
func (e *executor) addOutputProperties(
stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep,
dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem],
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ func TestDependOnNoOutputs(t *testing.T) {
// - wait_1 = {}; not having a property named 'deploy',
// - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output)
//
// This is not a robust test. It should be improved, or removed, if it continues to break.
// This is not a robust test. If it continues to break, it should be improved, or removed.
// To improve this test the engine needs to improve observability
// into the workflow's path structure at preparation time.
// into the workflow's expression path data structure at preparation time.
_, err := getTestImplPreparedWorkflow(t, invalidWaitfor)
assert.Error(t, err)
assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy")
Expand Down

0 comments on commit f8d82c8

Please sign in to comment.