diff --git a/workflow/executor.go b/workflow/executor.go index bcb02c26..e3b0d002 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -92,42 +92,44 @@ type executor struct { // 5. We can now construct the output data model of the workflow. func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte) (ExecutableWorkflow, error) { dag := dgraph.New[*DAGItem]() - if _, err := dag.AddNode("input", &DAGItem{ + if _, err := dag.AddNode(WorkflowInputKey, &DAGItem{ Kind: "input", }); err != nil { return nil, fmt.Errorf("failed to add input node (%w)", err) } - // First we unserialize the input schema + // Stage 1: Unserialize the input schema. typedInput, err := e.processInput(workflow) if err != nil { return nil, err } - // Then we process the steps. This involves several sub-steps, make sure to check the function. + // Stage 2: Process the steps. This involves several sub-steps, make sure + // to check the function. runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext) if err != nil { return nil, err } - // Now we can construct an internal data model for the output data model provided by the steps. This is the schema - // the expressions evaluate against. You can use this to do static code analysis on the expressions. + // Stage 3: Construct an internal data model for the output data model + // provided by the steps. This is the schema the expressions evaluate + // against. You can use this to do static code analysis on the expressions. internalDataModel := e.buildInternalDataModel(typedInput, stepOutputProperties) - // Stage 4: build the DAG dependencies. + // Stage 4: Build the DAG dependencies. if err := e.connectStepDependencies(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { return nil, err } - // Stage 5: Verify stage inputs - // Now that the output properties are here and the internal data model is here, it should be possible to loop - // through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas - // are valid, etc. - // Do this by looping through the steps' inputs, then verifying that the dag can provide them. - if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { + // Stage 5: Classify stage inputs. + // Use workflow steps, life cycles, and DAG, as an input (string) into a + // finite state machine Classifier. The input will either be + // accepted (nil), or it will be rejected with one of the error states. + if err := e.classifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { return nil, err } - // Stage 6: The output data model + + // Stage 6: Output data model. //goland:noinspection GoDeprecation if workflow.Output != nil { if len(workflow.Outputs) > 0 { @@ -171,8 +173,10 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte } } - // We don't like cycles as we can't execute them properly. Maybe we can improve this later to actually output the - // cycle to help the user? + // Stage 7: Check DAG acyclicity. + // We don't like cycles as we can't execute them properly. + // Maybe we can improve this later to actually output the cycle to + // help the user? if dag.HasCycles() { return nil, fmt.Errorf("your workflow has a cycle") } @@ -314,13 +318,13 @@ func (e *executor) connectStepDependencies( stageData[inputField] = data } if err := e.prepareDependencies(workflowContext, data, currentStageNode, internalDataModel, dag); err != nil { - return fmt.Errorf("failed to build dependency tree for %s (%w)", currentStageNode.ID(), err) + return fmt.Errorf("failed to build dependency tree for '%s' (%w)", currentStageNode.ID(), err) } } currentStageNode.Item().Data = stageData if len(stage.InputSchema) > 0 { currentStageNode.Item().DataSchema = schema.NewObjectSchema( - "input", + WorkflowInputKey, stage.InputSchema, ) } @@ -329,15 +333,19 @@ func (e *executor) connectStepDependencies( return nil } -// verifyWorkflowStageInputs verifies the schemas of the step inputs. -func (e *executor) verifyWorkflowStageInputs( +// classifyWorkflowStageInputs uses workflow steps, life cycles, and DAG, +// as an input (string) into a finite state machine Classifier. +// The function returns whether the input was accepted (nil), +// or rejected with one of the error states. +func (e *executor) classifyWorkflowStageInputs( workflow *Workflow, workflowContext map[string][]byte, stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema], dag dgraph.DirectedGraph[*DAGItem], internalDataModel *schema.ScopeSchema, ) error { - // Loop through all the steps in the engine + // Looping through the steps' inputs, then verify that the + // dag can provide them. for stepID /*stepData is the unused key*/ := range workflow.Steps { // Then loop through the stages of that step lifecycle := stepLifecycles[stepID] @@ -383,7 +391,7 @@ func (e *executor) verifyStageInputs( // It is present, so make sure it is compatible. err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext) if err != nil { - return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err) + return fmt.Errorf("input validation failed for workflow step '%s' stage '%s' (%w)", stepID, stage.ID, err) } } } @@ -493,7 +501,7 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti schema.NewObjectSchema( "workflow", map[string]*schema.PropertySchema{ - "input": schema.NewPropertySchema( + WorkflowInputKey: schema.NewPropertySchema( input, schema.NewDisplayValue( schema.PointerTo("Input"), @@ -507,9 +515,9 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti nil, nil, ), - "steps": schema.NewPropertySchema( + WorkflowStepsKey: schema.NewPropertySchema( schema.NewObjectSchema( - "steps", + WorkflowStepsKey, stepOutputProperties, ), nil, @@ -550,35 +558,22 @@ func (e *executor) buildOutputProperties( err, ) } - stageOutputProperties := make(map[string]*schema.PropertySchema, len(stage.Outputs)) - for outputID, outputSchema := range stage.Outputs { - stageDAGItem := &DAGItem{ - Kind: DAGItemKindStepStageOutput, - StepID: stepID, - StageID: stage.ID, - OutputID: outputID, - Provider: runnableStep, - } - stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem) - if err != nil { - return nil, fmt.Errorf( - "failed to add output %s for stage %s in step %s to DAG (%w)", - outputID, - stage.ID, - stepID, - err, - ) - } - if err := stepNode.Connect(stageNode.ID()); err != nil { - return nil, fmt.Errorf( - "failed to connect stage %s to its output %s in step %s (%w)", - stage.ID, outputID, stepID, err, - ) + + stageOutputsLen := len(stage.Outputs) + + // only add stages with outputs to the DAG + if stageOutputsLen > 0 { + stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) + stageOutputs, err2 := e.addOutputProperties( + stage, stepID, runnableStep, dag, stepNode, + stageOutputProperties) + if err2 != nil { + return nil, err2 } - stageOutputProperties[outputID] = schema.NewPropertySchema( - outputSchema.Schema(), - outputSchema.Display(), - false, + outputProperties[stage.ID] = schema.NewPropertySchema( + stageOutputs, + nil, + true, nil, nil, nil, @@ -586,16 +581,45 @@ func (e *executor) buildOutputProperties( nil, ) } + } + return outputProperties, nil +} - stageOutputs := schema.NewObjectSchema( - GetStageNodeID(stepID, stage.ID), - stageOutputProperties, - ) +// addOutputProperties adds a step's stage's output properties +// to the given DAG. +func (e *executor) addOutputProperties( + stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, + dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], + stageOutputProperties map[string]*schema.PropertySchema) (*schema.ObjectSchema, error) { - outputProperties[stage.ID] = schema.NewPropertySchema( - stageOutputs, - nil, - true, + for outputID, outputSchema := range stage.Outputs { + stageDAGItem := &DAGItem{ + Kind: DAGItemKindStepStageOutput, + StepID: stepID, + StageID: stage.ID, + OutputID: outputID, + Provider: runnableStep, + } + stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem) + if err != nil { + return nil, fmt.Errorf( + "failed to add output %s for stage %s in step %s to DAG (%w)", + outputID, + stage.ID, + stepID, + err, + ) + } + if err := stepNode.Connect(stageNode.ID()); err != nil { + return nil, fmt.Errorf( + "failed to connect stage %s to its output %s in step %s (%w)", + stage.ID, outputID, stepID, err, + ) + } + stageOutputProperties[outputID] = schema.NewPropertySchema( + outputSchema.Schema(), + outputSchema.Display(), + false, nil, nil, nil, @@ -603,7 +627,13 @@ func (e *executor) buildOutputProperties( nil, ) } - return outputProperties, nil + + stageOutputs := schema.NewObjectSchema( + GetStageNodeID(stepID, stage.ID), + stageOutputProperties, + ) + + return stageOutputs, nil } func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) { diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 9615b4f5..8fe5299f 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -234,3 +234,50 @@ func TestMismatchedInputTypes(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "unsupported data type for 'int' type: *schema.StringSchema") } + +var invalidWaitfor = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: + src: "n/a" + deployment_type: builtin + step: wait + input: + wait_time_ms: 0 + wait_2: + plugin: + src: "n/a" + deployment_type: builtin + step: wait + input: + wait_time_ms: 0 + # invalid wait for specification + # specifically, deploy does not have any outputs + wait_for: !expr steps.wait_1.deploy +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestDependOnNoOutputs(t *testing.T) { + // This test is to validate that this error is caught at workflow + // preparation instead of workflow execution. + + // The error handling does not currently distinguish between the edge cases: + // - wait_1 = {}; not having a property named 'deploy', + // - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output) + // + // This is not a robust test. If it continues to break, it should be improved, or removed. + // To improve this test the engine needs to improve observability + // into the workflow's expression path data structure at preparation time. + _, err := getTestImplPreparedWorkflow(t, invalidWaitfor) + assert.Error(t, err) + assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy") +}