Skip to content

Commit

Permalink
Wait For validation (#133)
Browse files Browse the repository at this point in the history
* tdd wait for validation at workflow preparation instead of execution

* test for use of illegal property in workflow

* finish changes

* add comment to test

* add some comments for clarification
  • Loading branch information
mfleader authored Dec 6, 2023
1 parent e5f2416 commit ffc9891
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 61 deletions.
152 changes: 91 additions & 61 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,42 +92,44 @@ type executor struct {
// 5. We can now construct the output data model of the workflow.
func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte) (ExecutableWorkflow, error) {
dag := dgraph.New[*DAGItem]()
if _, err := dag.AddNode("input", &DAGItem{
if _, err := dag.AddNode(WorkflowInputKey, &DAGItem{
Kind: "input",
}); err != nil {
return nil, fmt.Errorf("failed to add input node (%w)", err)
}

// First we unserialize the input schema
// Stage 1: Unserialize the input schema.
typedInput, err := e.processInput(workflow)
if err != nil {
return nil, err
}

// Then we process the steps. This involves several sub-steps, make sure to check the function.
// Stage 2: Process the steps. This involves several sub-steps, make sure
// to check the function.
runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext)
if err != nil {
return nil, err
}

// Now we can construct an internal data model for the output data model provided by the steps. This is the schema
// the expressions evaluate against. You can use this to do static code analysis on the expressions.
// Stage 3: Construct an internal data model for the output data model
// provided by the steps. This is the schema the expressions evaluate
// against. You can use this to do static code analysis on the expressions.
internalDataModel := e.buildInternalDataModel(typedInput, stepOutputProperties)

// Stage 4: build the DAG dependencies.
// Stage 4: Build the DAG dependencies.
if err := e.connectStepDependencies(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}

// Stage 5: Verify stage inputs
// Now that the output properties are here and the internal data model is here, it should be possible to loop
// through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas
// are valid, etc.
// Do this by looping through the steps' inputs, then verifying that the dag can provide them.
if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
// Stage 5: Classify stage inputs.
// Use workflow steps, life cycles, and DAG, as an input (string) into a
// finite state machine Classifier. The input will either be
// accepted (nil), or it will be rejected with one of the error states.
if err := e.classifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}
// Stage 6: The output data model

// Stage 6: Output data model.
//goland:noinspection GoDeprecation
if workflow.Output != nil {
if len(workflow.Outputs) > 0 {
Expand Down Expand Up @@ -171,8 +173,10 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
}
}

// We don't like cycles as we can't execute them properly. Maybe we can improve this later to actually output the
// cycle to help the user?
// Stage 7: Check DAG acyclicity.
// We don't like cycles as we can't execute them properly.
// Maybe we can improve this later to actually output the cycle to
// help the user?
if dag.HasCycles() {
return nil, fmt.Errorf("your workflow has a cycle")
}
Expand Down Expand Up @@ -314,13 +318,13 @@ func (e *executor) connectStepDependencies(
stageData[inputField] = data
}
if err := e.prepareDependencies(workflowContext, data, currentStageNode, internalDataModel, dag); err != nil {
return fmt.Errorf("failed to build dependency tree for %s (%w)", currentStageNode.ID(), err)
return fmt.Errorf("failed to build dependency tree for '%s' (%w)", currentStageNode.ID(), err)
}
}
currentStageNode.Item().Data = stageData
if len(stage.InputSchema) > 0 {
currentStageNode.Item().DataSchema = schema.NewObjectSchema(
"input",
WorkflowInputKey,
stage.InputSchema,
)
}
Expand All @@ -329,15 +333,19 @@ func (e *executor) connectStepDependencies(
return nil
}

// verifyWorkflowStageInputs verifies the schemas of the step inputs.
func (e *executor) verifyWorkflowStageInputs(
// classifyWorkflowStageInputs uses workflow steps, life cycles, and DAG,
// as an input (string) into a finite state machine Classifier.
// The function returns whether the input was accepted (nil),
// or rejected with one of the error states.
func (e *executor) classifyWorkflowStageInputs(
workflow *Workflow,
workflowContext map[string][]byte,
stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema],
dag dgraph.DirectedGraph[*DAGItem],
internalDataModel *schema.ScopeSchema,
) error {
// Loop through all the steps in the engine
// Looping through the steps' inputs, then verify that the
// dag can provide them.
for stepID /*stepData is the unused key*/ := range workflow.Steps {
// Then loop through the stages of that step
lifecycle := stepLifecycles[stepID]
Expand Down Expand Up @@ -383,7 +391,7 @@ func (e *executor) verifyStageInputs(
// It is present, so make sure it is compatible.
err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext)
if err != nil {
return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err)
return fmt.Errorf("input validation failed for workflow step '%s' stage '%s' (%w)", stepID, stage.ID, err)
}
}
}
Expand Down Expand Up @@ -493,7 +501,7 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti
schema.NewObjectSchema(
"workflow",
map[string]*schema.PropertySchema{
"input": schema.NewPropertySchema(
WorkflowInputKey: schema.NewPropertySchema(
input,
schema.NewDisplayValue(
schema.PointerTo("Input"),
Expand All @@ -507,9 +515,9 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti
nil,
nil,
),
"steps": schema.NewPropertySchema(
WorkflowStepsKey: schema.NewPropertySchema(
schema.NewObjectSchema(
"steps",
WorkflowStepsKey,
stepOutputProperties,
),
nil,
Expand Down Expand Up @@ -550,60 +558,82 @@ func (e *executor) buildOutputProperties(
err,
)
}
stageOutputProperties := make(map[string]*schema.PropertySchema, len(stage.Outputs))
for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)

stageOutputsLen := len(stage.Outputs)

// only add stages with outputs to the DAG
if stageOutputsLen > 0 {
stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen)
stageOutputs, err2 := e.addOutputProperties(
stage, stepID, runnableStep, dag, stepNode,
stageOutputProperties)
if err2 != nil {
return nil, err2
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
nil,
nil,
nil,
nil,
nil,
)
}
}
return outputProperties, nil
}

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)
// addOutputProperties adds a step's stage's output properties
// to the given DAG.
func (e *executor) addOutputProperties(
stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep,
dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem],
stageOutputProperties map[string]*schema.PropertySchema) (*schema.ObjectSchema, error) {

outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
nil,
nil,
nil,
nil,
nil,
)
}
return outputProperties, nil

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)

return stageOutputs, nil
}

func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) {
Expand Down
47 changes: 47 additions & 0 deletions workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,50 @@ func TestMismatchedInputTypes(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "unsupported data type for 'int' type: *schema.StringSchema")
}

var invalidWaitfor = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
wait_1:
plugin:
src: "n/a"
deployment_type: builtin
step: wait
input:
wait_time_ms: 0
wait_2:
plugin:
src: "n/a"
deployment_type: builtin
step: wait
input:
wait_time_ms: 0
# invalid wait for specification
# specifically, deploy does not have any outputs
wait_for: !expr steps.wait_1.deploy
outputs:
a:
b: !expr $.steps.wait_2.outputs
`

func TestDependOnNoOutputs(t *testing.T) {
// This test is to validate that this error is caught at workflow
// preparation instead of workflow execution.

// The error handling does not currently distinguish between the edge cases:
// - wait_1 = {}; not having a property named 'deploy',
// - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output)
//
// This is not a robust test. If it continues to break, it should be improved, or removed.
// To improve this test the engine needs to improve observability
// into the workflow's expression path data structure at preparation time.
_, err := getTestImplPreparedWorkflow(t, invalidWaitfor)
assert.Error(t, err)
assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy")
}

0 comments on commit ffc9891

Please sign in to comment.