From 4e407fa9b948b57cbd1c6f6cfdbf07a1ddd0159e Mon Sep 17 00:00:00 2001 From: Matthew F Leader Date: Wed, 29 Nov 2023 18:33:36 -0500 Subject: [PATCH 1/5] tdd wait for validation at workflow preparation instead of execution --- workflow/executor.go | 2 +- workflow/executor_test.go | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/workflow/executor.go b/workflow/executor.go index bcb02c26..07c1aa1e 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -383,7 +383,7 @@ func (e *executor) verifyStageInputs( // It is present, so make sure it is compatible. err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext) if err != nil { - return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err) + return fmt.Errorf("input validation failed for workflow step '%s' stage '%s' (%w)", stepID, stage.ID, err) } } } diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 9615b4f5..4abaa5a8 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -234,3 +234,45 @@ func TestMismatchedInputTypes(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "unsupported data type for 'int' type: *schema.StringSchema") } + +var invalidWaitfor = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + wait_1: + plugin: + src: "n/a" + deployment_type: builtin + step: wait + input: + wait_time_ms: 0 + wait_2: + plugin: + src: "n/a" + deployment_type: builtin + step: wait + input: + wait_time_ms: 0 + # invalid wait for specification + # specifically, deploy does not have any outputs + wait_for: !expr steps.wait_1.deploy +outputs: + a: + b: !expr $.steps.wait_2.outputs +` + +func TestDependOnNoOutputs(t *testing.T) { + // This test is to validate that this error is caught at workflow + // preparation instead of workflow execution. + // panic: cannot resolve expressions for steps.wait_2.starting + // (failed to resolve workflow map expressions (map key deploy not found)) + _, err := getTestImplPreparedWorkflow(t, invalidWaitfor) + assert.Error(t, err) + //_, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{}) + //assert.NoError(t, err) +} From 3ed144045374e6cc0038dd831a894a2ed8970cbd Mon Sep 17 00:00:00 2001 From: Matthew F Leader Date: Fri, 1 Dec 2023 17:07:11 -0500 Subject: [PATCH 2/5] test for use of illegal property in workflow --- workflow/executor.go | 20 +++++++++++++------- workflow/executor_test.go | 8 ++++---- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/workflow/executor.go b/workflow/executor.go index 07c1aa1e..a5e0c0ed 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -92,7 +92,7 @@ type executor struct { // 5. We can now construct the output data model of the workflow. func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte) (ExecutableWorkflow, error) { dag := dgraph.New[*DAGItem]() - if _, err := dag.AddNode("input", &DAGItem{ + if _, err := dag.AddNode(WorkflowInputKey, &DAGItem{ Kind: "input", }); err != nil { return nil, fmt.Errorf("failed to add input node (%w)", err) @@ -314,13 +314,13 @@ func (e *executor) connectStepDependencies( stageData[inputField] = data } if err := e.prepareDependencies(workflowContext, data, currentStageNode, internalDataModel, dag); err != nil { - return fmt.Errorf("failed to build dependency tree for %s (%w)", currentStageNode.ID(), err) + return fmt.Errorf("failed to build dependency tree for '%s' (%w)", currentStageNode.ID(), err) } } currentStageNode.Item().Data = stageData if len(stage.InputSchema) > 0 { currentStageNode.Item().DataSchema = schema.NewObjectSchema( - "input", + WorkflowInputKey, stage.InputSchema, ) } @@ -493,7 +493,7 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti schema.NewObjectSchema( "workflow", map[string]*schema.PropertySchema{ - "input": schema.NewPropertySchema( + WorkflowInputKey: schema.NewPropertySchema( input, schema.NewDisplayValue( schema.PointerTo("Input"), @@ -507,9 +507,9 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti nil, nil, ), - "steps": schema.NewPropertySchema( + WorkflowStepsKey: schema.NewPropertySchema( schema.NewObjectSchema( - "steps", + WorkflowStepsKey, stepOutputProperties, ), nil, @@ -550,7 +550,13 @@ func (e *executor) buildOutputProperties( err, ) } - stageOutputProperties := make(map[string]*schema.PropertySchema, len(stage.Outputs)) + + stageOutputsLen := len(stage.Outputs) + if stageOutputsLen == 0 { + continue + } + stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) + for outputID, outputSchema := range stage.Outputs { stageDAGItem := &DAGItem{ Kind: DAGItemKindStepStageOutput, diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 4abaa5a8..165c5f28 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -269,10 +269,10 @@ outputs: func TestDependOnNoOutputs(t *testing.T) { // This test is to validate that this error is caught at workflow // preparation instead of workflow execution. - // panic: cannot resolve expressions for steps.wait_2.starting - // (failed to resolve workflow map expressions (map key deploy not found)) + // The error handling does not currently distinguish between the edge cases: + // - wait_1 = {}; not having a property named 'deploy', + // - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output) _, err := getTestImplPreparedWorkflow(t, invalidWaitfor) assert.Error(t, err) - //_, _, err = preparedWorkflow.Execute(context.Background(), map[string]any{}) - //assert.NoError(t, err) + assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy") } From c2f203a680f6ffd9f9b6bc17ac754b6cc44e1d42 Mon Sep 17 00:00:00 2001 From: Matthew F Leader Date: Tue, 5 Dec 2023 13:06:40 -0500 Subject: [PATCH 3/5] finish changes --- workflow/executor.go | 108 +++++++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 49 deletions(-) diff --git a/workflow/executor.go b/workflow/executor.go index a5e0c0ed..74c89542 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -550,58 +550,52 @@ func (e *executor) buildOutputProperties( err, ) } - + stageOutputsLen := len(stage.Outputs) - if stageOutputsLen == 0 { - continue - } - stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) - - for outputID, outputSchema := range stage.Outputs { - stageDAGItem := &DAGItem{ - Kind: DAGItemKindStepStageOutput, - StepID: stepID, - StageID: stage.ID, - OutputID: outputID, - Provider: runnableStep, - } - stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem) - if err != nil { - return nil, fmt.Errorf( - "failed to add output %s for stage %s in step %s to DAG (%w)", - outputID, - stage.ID, - stepID, - err, - ) - } - if err := stepNode.Connect(stageNode.ID()); err != nil { - return nil, fmt.Errorf( - "failed to connect stage %s to its output %s in step %s (%w)", - stage.ID, outputID, stepID, err, - ) + + if stageOutputsLen > 0 { + // only add stages with outputs + stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) + m, err2 := e.addOutputProperties( + stage, stepID, runnableStep, dag, stepNode, + stageOutputProperties, outputProperties) + if err2 != nil { + return m, err2 } - stageOutputProperties[outputID] = schema.NewPropertySchema( - outputSchema.Schema(), - outputSchema.Display(), - false, - nil, - nil, - nil, - nil, - nil, - ) } + } + return outputProperties, nil +} - stageOutputs := schema.NewObjectSchema( - GetStageNodeID(stepID, stage.ID), - stageOutputProperties, - ) - - outputProperties[stage.ID] = schema.NewPropertySchema( - stageOutputs, - nil, - true, +func (e *executor) addOutputProperties(stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], stageOutputProperties map[string]*schema.PropertySchema, outputProperties map[string]*schema.PropertySchema) (map[string]*schema.PropertySchema, error) { + for outputID, outputSchema := range stage.Outputs { + stageDAGItem := &DAGItem{ + Kind: DAGItemKindStepStageOutput, + StepID: stepID, + StageID: stage.ID, + OutputID: outputID, + Provider: runnableStep, + } + stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem) + if err != nil { + return nil, fmt.Errorf( + "failed to add output %s for stage %s in step %s to DAG (%w)", + outputID, + stage.ID, + stepID, + err, + ) + } + if err := stepNode.Connect(stageNode.ID()); err != nil { + return nil, fmt.Errorf( + "failed to connect stage %s to its output %s in step %s (%w)", + stage.ID, outputID, stepID, err, + ) + } + stageOutputProperties[outputID] = schema.NewPropertySchema( + outputSchema.Schema(), + outputSchema.Display(), + false, nil, nil, nil, @@ -609,7 +603,23 @@ func (e *executor) buildOutputProperties( nil, ) } - return outputProperties, nil + + stageOutputs := schema.NewObjectSchema( + GetStageNodeID(stepID, stage.ID), + stageOutputProperties, + ) + + outputProperties[stage.ID] = schema.NewPropertySchema( + stageOutputs, + nil, + true, + nil, + nil, + nil, + nil, + nil, + ) + return nil, nil } func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) { From 54a73f3e0744527100d1ac3943d043a3bbadc73c Mon Sep 17 00:00:00 2001 From: Matthew F Leader Date: Tue, 5 Dec 2023 13:14:40 -0500 Subject: [PATCH 4/5] add comment to test --- workflow/executor.go | 34 +++++++++++++++++++--------------- workflow/executor_test.go | 5 +++++ 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/workflow/executor.go b/workflow/executor.go index 74c89542..ec6d7153 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -556,18 +556,32 @@ func (e *executor) buildOutputProperties( if stageOutputsLen > 0 { // only add stages with outputs stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) - m, err2 := e.addOutputProperties( + stageOutputs, err2 := e.addOutputProperties( stage, stepID, runnableStep, dag, stepNode, - stageOutputProperties, outputProperties) + stageOutputProperties) if err2 != nil { - return m, err2 + return nil, err2 } + outputProperties[stage.ID] = schema.NewPropertySchema( + stageOutputs, + nil, + true, + nil, + nil, + nil, + nil, + nil, + ) } } return outputProperties, nil } -func (e *executor) addOutputProperties(stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], stageOutputProperties map[string]*schema.PropertySchema, outputProperties map[string]*schema.PropertySchema) (map[string]*schema.PropertySchema, error) { +func (e *executor) addOutputProperties( + stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, + dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], + stageOutputProperties map[string]*schema.PropertySchema) (*schema.ObjectSchema, error) { + for outputID, outputSchema := range stage.Outputs { stageDAGItem := &DAGItem{ Kind: DAGItemKindStepStageOutput, @@ -609,17 +623,7 @@ func (e *executor) addOutputProperties(stage step.LifecycleStageWithSchema, step stageOutputProperties, ) - outputProperties[stage.ID] = schema.NewPropertySchema( - stageOutputs, - nil, - true, - nil, - nil, - nil, - nil, - nil, - ) - return nil, nil + return stageOutputs, nil } func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) { diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 165c5f28..86160ba5 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -269,9 +269,14 @@ outputs: func TestDependOnNoOutputs(t *testing.T) { // This test is to validate that this error is caught at workflow // preparation instead of workflow execution. + // The error handling does not currently distinguish between the edge cases: // - wait_1 = {}; not having a property named 'deploy', // - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output) + // + // This is not a robust test. It should be improved, or removed, if it continues to break. + // To improve this test the engine needs to improve observability + // into the workflow's path structure at preparation time. _, err := getTestImplPreparedWorkflow(t, invalidWaitfor) assert.Error(t, err) assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy") From f8d82c81b74cef24149955f97d299f5370461a59 Mon Sep 17 00:00:00 2001 From: Matthew F Leader Date: Wed, 6 Dec 2023 12:21:32 -0500 Subject: [PATCH 5/5] add some comments for clarification --- workflow/executor.go | 46 ++++++++++++++++++++++++--------------- workflow/executor_test.go | 4 ++-- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/workflow/executor.go b/workflow/executor.go index ec6d7153..e3b0d002 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -98,36 +98,38 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte return nil, fmt.Errorf("failed to add input node (%w)", err) } - // First we unserialize the input schema + // Stage 1: Unserialize the input schema. typedInput, err := e.processInput(workflow) if err != nil { return nil, err } - // Then we process the steps. This involves several sub-steps, make sure to check the function. + // Stage 2: Process the steps. This involves several sub-steps, make sure + // to check the function. runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext) if err != nil { return nil, err } - // Now we can construct an internal data model for the output data model provided by the steps. This is the schema - // the expressions evaluate against. You can use this to do static code analysis on the expressions. + // Stage 3: Construct an internal data model for the output data model + // provided by the steps. This is the schema the expressions evaluate + // against. You can use this to do static code analysis on the expressions. internalDataModel := e.buildInternalDataModel(typedInput, stepOutputProperties) - // Stage 4: build the DAG dependencies. + // Stage 4: Build the DAG dependencies. if err := e.connectStepDependencies(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { return nil, err } - // Stage 5: Verify stage inputs - // Now that the output properties are here and the internal data model is here, it should be possible to loop - // through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas - // are valid, etc. - // Do this by looping through the steps' inputs, then verifying that the dag can provide them. - if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { + // Stage 5: Classify stage inputs. + // Use workflow steps, life cycles, and DAG, as an input (string) into a + // finite state machine Classifier. The input will either be + // accepted (nil), or it will be rejected with one of the error states. + if err := e.classifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil { return nil, err } - // Stage 6: The output data model + + // Stage 6: Output data model. //goland:noinspection GoDeprecation if workflow.Output != nil { if len(workflow.Outputs) > 0 { @@ -171,8 +173,10 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte } } - // We don't like cycles as we can't execute them properly. Maybe we can improve this later to actually output the - // cycle to help the user? + // Stage 7: Check DAG acyclicity. + // We don't like cycles as we can't execute them properly. + // Maybe we can improve this later to actually output the cycle to + // help the user? if dag.HasCycles() { return nil, fmt.Errorf("your workflow has a cycle") } @@ -329,15 +333,19 @@ func (e *executor) connectStepDependencies( return nil } -// verifyWorkflowStageInputs verifies the schemas of the step inputs. -func (e *executor) verifyWorkflowStageInputs( +// classifyWorkflowStageInputs uses workflow steps, life cycles, and DAG, +// as an input (string) into a finite state machine Classifier. +// The function returns whether the input was accepted (nil), +// or rejected with one of the error states. +func (e *executor) classifyWorkflowStageInputs( workflow *Workflow, workflowContext map[string][]byte, stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema], dag dgraph.DirectedGraph[*DAGItem], internalDataModel *schema.ScopeSchema, ) error { - // Loop through all the steps in the engine + // Looping through the steps' inputs, then verify that the + // dag can provide them. for stepID /*stepData is the unused key*/ := range workflow.Steps { // Then loop through the stages of that step lifecycle := stepLifecycles[stepID] @@ -553,8 +561,8 @@ func (e *executor) buildOutputProperties( stageOutputsLen := len(stage.Outputs) + // only add stages with outputs to the DAG if stageOutputsLen > 0 { - // only add stages with outputs stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen) stageOutputs, err2 := e.addOutputProperties( stage, stepID, runnableStep, dag, stepNode, @@ -577,6 +585,8 @@ func (e *executor) buildOutputProperties( return outputProperties, nil } +// addOutputProperties adds a step's stage's output properties +// to the given DAG. func (e *executor) addOutputProperties( stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep, dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem], diff --git a/workflow/executor_test.go b/workflow/executor_test.go index 86160ba5..8fe5299f 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -274,9 +274,9 @@ func TestDependOnNoOutputs(t *testing.T) { // - wait_1 = {}; not having a property named 'deploy', // - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output) // - // This is not a robust test. It should be improved, or removed, if it continues to break. + // This is not a robust test. If it continues to break, it should be improved, or removed. // To improve this test the engine needs to improve observability - // into the workflow's path structure at preparation time. + // into the workflow's expression path data structure at preparation time. _, err := getTestImplPreparedWorkflow(t, invalidWaitfor) assert.Error(t, err) assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy")