Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait For validation #133

Merged
merged 5 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 91 additions & 61 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,42 +92,44 @@ type executor struct {
// 5. We can now construct the output data model of the workflow.
func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte) (ExecutableWorkflow, error) {
dag := dgraph.New[*DAGItem]()
if _, err := dag.AddNode("input", &DAGItem{
if _, err := dag.AddNode(WorkflowInputKey, &DAGItem{
Kind: "input",
}); err != nil {
return nil, fmt.Errorf("failed to add input node (%w)", err)
}

// First we unserialize the input schema
// Stage 1: Unserialize the input schema.
typedInput, err := e.processInput(workflow)
if err != nil {
return nil, err
}

// Then we process the steps. This involves several sub-steps, make sure to check the function.
// Stage 2: Process the steps. This involves several sub-steps, make sure
// to check the function.
runnableSteps, stepOutputProperties, stepLifecycles, stepRunData, err := e.processSteps(workflow, dag, workflowContext)
if err != nil {
return nil, err
}

// Now we can construct an internal data model for the output data model provided by the steps. This is the schema
// the expressions evaluate against. You can use this to do static code analysis on the expressions.
// Stage 3: Construct an internal data model for the output data model
// provided by the steps. This is the schema the expressions evaluate
// against. You can use this to do static code analysis on the expressions.
internalDataModel := e.buildInternalDataModel(typedInput, stepOutputProperties)

// Stage 4: build the DAG dependencies.
// Stage 4: Build the DAG dependencies.
if err := e.connectStepDependencies(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}

// Stage 5: Verify stage inputs
// Now that the output properties are here and the internal data model is here, it should be possible to loop
// through them again to verify that all inputs are valid. So verify that all required inputs are present, schemas
// are valid, etc.
// Do this by looping through the steps' inputs, then verifying that the dag can provide them.
if err := e.verifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
// Stage 5: Classify stage inputs.
// Use workflow steps, life cycles, and DAG, as an input (string) into a
// finite state machine Classifier. The input will either be
// accepted (nil), or it will be rejected with one of the error states.
if err := e.classifyWorkflowStageInputs(workflow, workflowContext, stepLifecycles, dag, internalDataModel); err != nil {
return nil, err
}
// Stage 6: The output data model

// Stage 6: Output data model.
//goland:noinspection GoDeprecation
if workflow.Output != nil {
if len(workflow.Outputs) > 0 {
Expand Down Expand Up @@ -171,8 +173,10 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
}
}

// We don't like cycles as we can't execute them properly. Maybe we can improve this later to actually output the
// cycle to help the user?
// Stage 7: Check DAG acyclicity.
// We don't like cycles as we can't execute them properly.
// Maybe we can improve this later to actually output the cycle to
// help the user?
if dag.HasCycles() {
return nil, fmt.Errorf("your workflow has a cycle")
}
Expand Down Expand Up @@ -314,13 +318,13 @@ func (e *executor) connectStepDependencies(
stageData[inputField] = data
}
if err := e.prepareDependencies(workflowContext, data, currentStageNode, internalDataModel, dag); err != nil {
return fmt.Errorf("failed to build dependency tree for %s (%w)", currentStageNode.ID(), err)
return fmt.Errorf("failed to build dependency tree for '%s' (%w)", currentStageNode.ID(), err)
}
}
currentStageNode.Item().Data = stageData
if len(stage.InputSchema) > 0 {
currentStageNode.Item().DataSchema = schema.NewObjectSchema(
"input",
WorkflowInputKey,
stage.InputSchema,
)
}
Expand All @@ -329,15 +333,19 @@ func (e *executor) connectStepDependencies(
return nil
}

// verifyWorkflowStageInputs verifies the schemas of the step inputs.
func (e *executor) verifyWorkflowStageInputs(
// classifyWorkflowStageInputs uses workflow steps, life cycles, and DAG,
// as an input (string) into a finite state machine Classifier.
// The function returns whether the input was accepted (nil),
// or rejected with one of the error states.
func (e *executor) classifyWorkflowStageInputs(
workflow *Workflow,
workflowContext map[string][]byte,
stepLifecycles map[string]step.Lifecycle[step.LifecycleStageWithSchema],
dag dgraph.DirectedGraph[*DAGItem],
internalDataModel *schema.ScopeSchema,
) error {
// Loop through all the steps in the engine
// Looping through the steps' inputs, then verify that the
// dag can provide them.
for stepID /*stepData is the unused key*/ := range workflow.Steps {
// Then loop through the stages of that step
lifecycle := stepLifecycles[stepID]
Expand Down Expand Up @@ -383,7 +391,7 @@ func (e *executor) verifyStageInputs(
// It is present, so make sure it is compatible.
err := e.preValidateCompatibility(internalDataModel, providedInputForField, stageInputSchema, workflowContext)
if err != nil {
return fmt.Errorf("input validation failed for workflow step %s stage %s (%w)", stepID, stage.ID, err)
return fmt.Errorf("input validation failed for workflow step '%s' stage '%s' (%w)", stepID, stage.ID, err)
}
}
}
Expand Down Expand Up @@ -493,7 +501,7 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti
schema.NewObjectSchema(
"workflow",
map[string]*schema.PropertySchema{
"input": schema.NewPropertySchema(
WorkflowInputKey: schema.NewPropertySchema(
input,
schema.NewDisplayValue(
schema.PointerTo("Input"),
Expand All @@ -507,9 +515,9 @@ func (e *executor) buildInternalDataModel(input schema.Scope, stepOutputProperti
nil,
nil,
),
"steps": schema.NewPropertySchema(
WorkflowStepsKey: schema.NewPropertySchema(
schema.NewObjectSchema(
"steps",
WorkflowStepsKey,
stepOutputProperties,
),
nil,
Expand Down Expand Up @@ -550,60 +558,82 @@ func (e *executor) buildOutputProperties(
err,
)
}
stageOutputProperties := make(map[string]*schema.PropertySchema, len(stage.Outputs))
for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)

stageOutputsLen := len(stage.Outputs)

// only add stages with outputs to the DAG
if stageOutputsLen > 0 {
stageOutputProperties := make(map[string]*schema.PropertySchema, stageOutputsLen)
stageOutputs, err2 := e.addOutputProperties(
stage, stepID, runnableStep, dag, stepNode,
stageOutputProperties)
if err2 != nil {
return nil, err2
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
nil,
nil,
nil,
nil,
nil,
)
}
}
return outputProperties, nil
}

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)
// addOutputProperties adds a step's stage's output properties
// to the given DAG.
func (e *executor) addOutputProperties(
stage step.LifecycleStageWithSchema, stepID string, runnableStep step.RunnableStep,
dag dgraph.DirectedGraph[*DAGItem], stepNode dgraph.Node[*DAGItem],
stageOutputProperties map[string]*schema.PropertySchema) (*schema.ObjectSchema, error) {

outputProperties[stage.ID] = schema.NewPropertySchema(
stageOutputs,
nil,
true,
for outputID, outputSchema := range stage.Outputs {
stageDAGItem := &DAGItem{
Kind: DAGItemKindStepStageOutput,
StepID: stepID,
StageID: stage.ID,
OutputID: outputID,
Provider: runnableStep,
}
stageNode, err := dag.AddNode(stageDAGItem.String(), stageDAGItem)
if err != nil {
return nil, fmt.Errorf(
"failed to add output %s for stage %s in step %s to DAG (%w)",
outputID,
stage.ID,
stepID,
err,
)
}
if err := stepNode.Connect(stageNode.ID()); err != nil {
return nil, fmt.Errorf(
"failed to connect stage %s to its output %s in step %s (%w)",
stage.ID, outputID, stepID, err,
)
}
stageOutputProperties[outputID] = schema.NewPropertySchema(
outputSchema.Schema(),
outputSchema.Display(),
false,
nil,
nil,
nil,
nil,
nil,
)
}
return outputProperties, nil

stageOutputs := schema.NewObjectSchema(
GetStageNodeID(stepID, stage.ID),
stageOutputProperties,
)

return stageOutputs, nil
}

func (e *executor) getRunData(stepKind step.Provider, runnableStep step.RunnableStep, stepID string, stepDataMap map[any]any) (map[string]any, error) {
Expand Down
47 changes: 47 additions & 0 deletions workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,50 @@ func TestMismatchedInputTypes(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "unsupported data type for 'int' type: *schema.StringSchema")
}

var invalidWaitfor = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
wait_1:
plugin:
src: "n/a"
deployment_type: builtin
step: wait
input:
wait_time_ms: 0
wait_2:
plugin:
src: "n/a"
deployment_type: builtin
step: wait
input:
wait_time_ms: 0
# invalid wait for specification
# specifically, deploy does not have any outputs
wait_for: !expr steps.wait_1.deploy
outputs:
a:
b: !expr $.steps.wait_2.outputs
`

func TestDependOnNoOutputs(t *testing.T) {
// This test is to validate that this error is caught at workflow
// preparation instead of workflow execution.

// The error handling does not currently distinguish between the edge cases:
// - wait_1 = {}; not having a property named 'deploy',
// - wait_1 = { deploy: nil }; the 'deploy' property has no outputs (i.e. nil output)
//
// This is not a robust test. If it continues to break, it should be improved, or removed.
// To improve this test the engine needs to improve observability
// into the workflow's expression path data structure at preparation time.
_, err := getTestImplPreparedWorkflow(t, invalidWaitfor)
assert.Error(t, err)
assert.Contains(t, err.Error(), "object wait_1 does not have a property named deploy")
}
Loading