Skip to content

Commit

Permalink
fix output schema bug (#163)
Browse files Browse the repository at this point in the history
* fix output schema type in workflow model schema

* add output schema happy path test

* add unserialize test

* add output schema error path tests
  • Loading branch information
mfleader committed Mar 11, 2024
1 parent da06fde commit 2d0d8f2
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 9 deletions.
10 changes: 5 additions & 5 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ func (e *executor) Prepare(workflow *Workflow, workflowContext map[string][]byte
outputsSchema := map[string]*schema.StepOutputSchema{}
for outputID, outputData := range workflow.Outputs {
var outputSchema *schema.StepOutputSchema
if workflow.OutputSchema != nil && workflow.OutputSchema[outputID] != nil {
outputSchemaData, err := schema.DescribeStepOutput().Unserialize(workflow.OutputSchema[outputID])
if err != nil {
return nil, fmt.Errorf("unable to decode workflow output schema %s (%w)", outputID, err)
if workflow.OutputSchema != nil {
outputSchemaData, ok := workflow.OutputSchema[outputID]
if !ok {
return nil, fmt.Errorf("could not find output id %q in output schema", outputID)
}
outputSchema = outputSchemaData.(*schema.StepOutputSchema)
outputSchema = outputSchemaData
}
outputSchema, err = infer.OutputSchema(
outputData,
Expand Down
39 changes: 39 additions & 0 deletions workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,42 @@ func TestDependOnNoOutputs(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), `object wait_1 does not have a property named "deploy"`)
}

var workflowWithOutputSchema = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
long_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 1
outputs:
success:
first_step_output: !expr $.steps.long_wait.outputs
outputSchema:
missingno:
schema:
root: RootObjectOut
objects:
RootObjectOut:
id: RootObjectOut
properties:
message:
type:
type_id: string`

func TestWorkflow_Execute_Error_OutputSchema_OutputKey(t *testing.T) {
_, err := createTestExecutableWorkflow(t, workflowWithOutputSchema, map[string][]byte{})
assert.Error(t, err)
mismatchedOutputID := "success"
errorStr := fmt.Sprintf("could not find output id %q in output schema", mismatchedOutputID)
assert.Contains(t, err.Error(), errorStr)
}
6 changes: 3 additions & 3 deletions workflow/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ type Workflow struct {
// OutputSchema is an optional override for the automatically inferred output schema from the Outputs data and
// expressions. The keys must be the output IDs from Outputs and the values must be a StepOutputSchema object as
// per the Arcaflow schema.
OutputSchema map[string]any `json:"outputSchema"`
OutputSchema map[string]*schema.StepOutputSchema `json:"outputSchema"`
// Output is the legacy way to define a single output. It conflicts the "outputs" field and if filled, will create a
// "success" output.
//
// Deprecated: use Outputs instead.
Output any `json:"output"`
}

// getSchema returns the entire workflow schema.
func getSchema() *schema.TypedScopeSchema[*Workflow] {
// GetSchema returns the entire workflow schema.
func GetSchema() *schema.TypedScopeSchema[*Workflow] {
return schema.NewTypedScopeSchema[*Workflow](
schema.NewStructMappedObjectSchema[*Workflow](
"Workflow",
Expand Down
69 changes: 69 additions & 0 deletions workflow/model_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package workflow_test

import (
"go.arcalot.io/assert"
"go.flow.arcalot.io/engine/workflow"
"testing"
)

var versionExp = "v0.2.0"
var inputExp = map[string]any{
"root": "RootObject",
"objects": map[string]any{
"RootObject": map[string]any{
"id": "RootObject",
"properties": map[string]any{}}},
}
var stepsExp = map[string]any{
"long_wait": map[string]any{
"plugin": map[string]any{
"src": "n/a",
"deployment_type": "builtin",
},
"step": "wait",
"input": map[string]any{
"wait_time_ms": 1}},
}
var outputID = "success"
var outputsExp = map[string]any{
outputID: "!expr $.steps.long_wait.outputs",
}
var outputSchemaRootID = "RootObjectOut"
var stepOutputSchemaInput = map[string]any{
"schema": map[string]any{
"root": outputSchemaRootID,
"objects": map[string]any{
outputSchemaRootID: map[string]any{
"id": outputSchemaRootID,
"properties": map[string]any{
"message": map[string]any{
"type": map[string]any{
"type_id": "string",
}}}}}},
}
var outputSchemaInput = map[string]any{
outputID: stepOutputSchemaInput,
}
var workflowSchemaInput = map[string]any{
"version": versionExp,
"input": inputExp,
"steps": stepsExp,
"outputs": outputsExp,
"outputSchema": outputSchemaInput,
}

// Test_SchemaWorkflow tests the workflow model schema's ability
// to validate the compatibility of a fully specified and valid
// workflow.
func Test_SchemaWorkflow_ValidateCompatibility(t *testing.T) {
workflowModelSchema := workflow.GetSchema()
assert.NoError(t, workflowModelSchema.ValidateCompatibility(workflowSchemaInput))
}

// Test_SchemaWorkflow tests the workflow model schema's ability
// to unserialize a fully specified and valid workflow.
func Test_SchemaWorkflow_UnserializeType(t *testing.T) {
workflowModelSchema := workflow.GetSchema()
_, err := workflowModelSchema.UnserializeType(workflowSchemaInput)
assert.NoError(t, err)
}
51 changes: 51 additions & 0 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,3 +968,54 @@ func TestWorkflowWithEscapedCharacters(t *testing.T) {

}
}

var workflowWithOutputSchemaMalformed = `
version: v0.2.0
input:
root: RootObject
objects:
RootObject:
id: RootObject
properties: {}
steps:
long_wait:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 1
outputs:
success:
first_step_output: !expr $.steps.long_wait.outputs
outputSchema:
success:
schema:
root: RootObjectOut
objects:
RootObjectOut:
id: RootObjectOut
properties: {}`

func TestWorkflow_Execute_Error_MalformedOutputSchema(t *testing.T) {
pwf, err := createTestExecutableWorkflow(t, workflowWithOutputSchemaMalformed, map[string][]byte{})
assert.NoError(t, err)
_, _, err = pwf.Execute(context.Background(), map[string]any{})
assert.Error(t, err)
assert.Contains(t, err.Error(), "bug: output schema cannot unserialize")
}

func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx map[string][]byte) (workflow.ExecutableWorkflow, error) {
logConfig := log.Config{
Level: log.LevelDebug,
Destination: log.DestinationStdout,
}
logger := log.New(logConfig)
cfg := &config.Config{Log: logConfig}
stepRegistry := NewTestImplStepRegistry(logger, t)
executor := lang.Must2(workflow.NewExecutor(logger, cfg, stepRegistry,
builtinfunctions.GetFunctions(),
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML([]byte(workflowStr)))
return executor.Prepare(wf, workflowCtx)
}
2 changes: 1 addition & 1 deletion workflow/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (y yamlConverter) FromYAML(data []byte) (*Workflow, error) {
return nil, &ErrInvalidWorkflow{err}
}

workflowSchema := getSchema()
workflowSchema := GetSchema()
workflow, err := workflowSchema.UnserializeType(rawWorkflow)
if err != nil {
return nil, &ErrInvalidWorkflow{err}
Expand Down

0 comments on commit 2d0d8f2

Please sign in to comment.