From 2d0d8f217f92df697bee172ee0cc0c6866f5d2ae Mon Sep 17 00:00:00 2001 From: Matt Leader Date: Mon, 11 Mar 2024 17:18:42 -0400 Subject: [PATCH] fix output schema bug (#163) * fix output schema type in workflow model schema * add output schema happy path test * add unserialize test * add output schema error path tests --- workflow/executor.go | 10 +++--- workflow/executor_test.go | 39 ++++++++++++++++++++++ workflow/model.go | 6 ++-- workflow/model_test.go | 69 +++++++++++++++++++++++++++++++++++++++ workflow/workflow_test.go | 51 +++++++++++++++++++++++++++++ workflow/yaml.go | 2 +- 6 files changed, 168 insertions(+), 9 deletions(-) create mode 100644 workflow/model_test.go diff --git a/workflow/executor.go b/workflow/executor.go index dc87ef35..81e2d004 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -155,12 +155,12 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte outputsSchema := map[string]*schema.StepOutputSchema{} for outputID, outputData := range workflow.Outputs { var outputSchema *schema.StepOutputSchema - if workflow.OutputSchema != nil && workflow.OutputSchema[outputID] != nil { - outputSchemaData, err := schema.DescribeStepOutput().Unserialize(workflow.OutputSchema[outputID]) - if err != nil { - return nil, fmt.Errorf("unable to decode workflow output schema %s (%w)", outputID, err) + if workflow.OutputSchema != nil { + outputSchemaData, ok := workflow.OutputSchema[outputID] + if !ok { + return nil, fmt.Errorf("could not find output id %q in output schema", outputID) } - outputSchema = outputSchemaData.(*schema.StepOutputSchema) + outputSchema = outputSchemaData } outputSchema, err = infer.OutputSchema( outputData, diff --git a/workflow/executor_test.go b/workflow/executor_test.go index fdb90634..4744bb24 100644 --- a/workflow/executor_test.go +++ b/workflow/executor_test.go @@ -284,3 +284,42 @@ func TestDependOnNoOutputs(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), `object wait_1 does not have a property named "deploy"`) } + +var workflowWithOutputSchema = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 +outputs: + success: + first_step_output: !expr $.steps.long_wait.outputs +outputSchema: + missingno: + schema: + root: RootObjectOut + objects: + RootObjectOut: + id: RootObjectOut + properties: + message: + type: + type_id: string` + +func TestWorkflow_Execute_Error_OutputSchema_OutputKey(t *testing.T) { + _, err := createTestExecutableWorkflow(t, workflowWithOutputSchema, map[string][]byte{}) + assert.Error(t, err) + mismatchedOutputID := "success" + errorStr := fmt.Sprintf("could not find output id %q in output schema", mismatchedOutputID) + assert.Contains(t, err.Error(), errorStr) +} diff --git a/workflow/model.go b/workflow/model.go index 5ff2b02a..54ccba15 100644 --- a/workflow/model.go +++ b/workflow/model.go @@ -27,7 +27,7 @@ type Workflow struct { // OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and // expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as // per the Arcaflow schema. - OutputSchema map[string]any `json:"outputSchema"` + OutputSchema map[string]*schema.StepOutputSchema `json:"outputSchema"` // Output is the legacy way to define a single output. It conflicts the "outputs" field and if filled, will create a // "success" output. // @@ -35,8 +35,8 @@ type Workflow struct { Output any `json:"output"` } -// getSchema returns the entire workflow schema. -func getSchema() *schema.TypedScopeSchema[*Workflow] { +// GetSchema returns the entire workflow schema. +func GetSchema() *schema.TypedScopeSchema[*Workflow] { return schema.NewTypedScopeSchema[*Workflow]( schema.NewStructMappedObjectSchema[*Workflow]( "Workflow", diff --git a/workflow/model_test.go b/workflow/model_test.go new file mode 100644 index 00000000..03556be5 --- /dev/null +++ b/workflow/model_test.go @@ -0,0 +1,69 @@ +package workflow_test + +import ( + "go.arcalot.io/assert" + "go.flow.arcalot.io/engine/workflow" + "testing" +) + +var versionExp = "v0.2.0" +var inputExp = map[string]any{ + "root": "RootObject", + "objects": map[string]any{ + "RootObject": map[string]any{ + "id": "RootObject", + "properties": map[string]any{}}}, +} +var stepsExp = map[string]any{ + "long_wait": map[string]any{ + "plugin": map[string]any{ + "src": "n/a", + "deployment_type": "builtin", + }, + "step": "wait", + "input": map[string]any{ + "wait_time_ms": 1}}, +} +var outputID = "success" +var outputsExp = map[string]any{ + outputID: "!expr $.steps.long_wait.outputs", +} +var outputSchemaRootID = "RootObjectOut" +var stepOutputSchemaInput = map[string]any{ + "schema": map[string]any{ + "root": outputSchemaRootID, + "objects": map[string]any{ + outputSchemaRootID: map[string]any{ + "id": outputSchemaRootID, + "properties": map[string]any{ + "message": map[string]any{ + "type": map[string]any{ + "type_id": "string", + }}}}}}, +} +var outputSchemaInput = map[string]any{ + outputID: stepOutputSchemaInput, +} +var workflowSchemaInput = map[string]any{ + "version": versionExp, + "input": inputExp, + "steps": stepsExp, + "outputs": outputsExp, + "outputSchema": outputSchemaInput, +} + +// Test_SchemaWorkflow tests the workflow model schema's ability +// to validate the compatibility of a fully specified and valid +// workflow. +func Test_SchemaWorkflow_ValidateCompatibility(t *testing.T) { + workflowModelSchema := workflow.GetSchema() + assert.NoError(t, workflowModelSchema.ValidateCompatibility(workflowSchemaInput)) +} + +// Test_SchemaWorkflow tests the workflow model schema's ability +// to unserialize a fully specified and valid workflow. +func Test_SchemaWorkflow_UnserializeType(t *testing.T) { + workflowModelSchema := workflow.GetSchema() + _, err := workflowModelSchema.UnserializeType(workflowSchemaInput) + assert.NoError(t, err) +} diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 4888b7fa..b70650d7 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -968,3 +968,54 @@ func TestWorkflowWithEscapedCharacters(t *testing.T) { } } + +var workflowWithOutputSchemaMalformed = ` +version: v0.2.0 +input: + root: RootObject + objects: + RootObject: + id: RootObject + properties: {} +steps: + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 +outputs: + success: + first_step_output: !expr $.steps.long_wait.outputs +outputSchema: + success: + schema: + root: RootObjectOut + objects: + RootObjectOut: + id: RootObjectOut + properties: {}` + +func TestWorkflow_Execute_Error_MalformedOutputSchema(t *testing.T) { + pwf, err := createTestExecutableWorkflow(t, workflowWithOutputSchemaMalformed, map[string][]byte{}) + assert.NoError(t, err) + _, _, err = pwf.Execute(context.Background(), map[string]any{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "bug: output schema cannot unserialize") +} + +func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx map[string][]byte) (workflow.ExecutableWorkflow, error) { + logConfig := log.Config{ + Level: log.LevelDebug, + Destination: log.DestinationStdout, + } + logger := log.New(logConfig) + cfg := &config.Config{Log: logConfig} + stepRegistry := NewTestImplStepRegistry(logger, t) + executor := lang.Must2(workflow.NewExecutor(logger, cfg, stepRegistry, + builtinfunctions.GetFunctions(), + )) + wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowStr))) + return executor.Prepare(wf, workflowCtx) +} diff --git a/workflow/yaml.go b/workflow/yaml.go index 78bc78d9..a1d5b2d8 100644 --- a/workflow/yaml.go +++ b/workflow/yaml.go @@ -41,7 +41,7 @@ func (y yamlConverter) FromYAML(data []byte) (*Workflow, error) { return nil, &ErrInvalidWorkflow{err} } - workflowSchema := getSchema() + workflowSchema := GetSchema() workflow, err := workflowSchema.UnserializeType(rawWorkflow) if err != nil { return nil, &ErrInvalidWorkflow{err}