Skip to content

Commit

Permalink
Added optional expression tags (#212)
Browse files Browse the repository at this point in the history
* Added optional expression tags

* Fix linter error

* Improved comment for a test

* Addressed review comments
  • Loading branch information
jaredoconnell authored Sep 20, 2024
1 parent ddae28f commit 62797d2
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 43 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module go.flow.arcalot.io/engine

go 1.21
go 1.22.0

toolchain go1.22.2

require (
go.arcalot.io/assert v1.8.0
go.arcalot.io/dgraph v1.6.0
go.arcalot.io/dgraph v1.7.0
go.arcalot.io/lang v1.1.0
go.arcalot.io/log/v2 v2.2.0
go.flow.arcalot.io/deployer v0.6.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.arcalot.io/assert v1.8.0 h1:hGcHMPncQXwQvjj7MbyOu2gg8VIBB00crUJZpeQOjxs=
go.arcalot.io/assert v1.8.0/go.mod h1:nNmWPoNUHFyrPkNrD2aASm5yPuAfiWdB/4X7Lw3ykHk=
go.arcalot.io/dgraph v1.6.0 h1:mJFZ1vdPEg3KtqyhNqYtWVAkxxWBWoJVUFZQ2Z4mbvE=
go.arcalot.io/dgraph v1.6.0/go.mod h1:+Kxc81utiihMSmC1/ttSPGLDlWPpvgOpNxSFmIDPxFM=
go.arcalot.io/dgraph v1.7.0 h1:KmVoPoV7jKbc7DgBS7Bmh/gliEXw4S4kyeVpPvyIygs=
go.arcalot.io/dgraph v1.7.0/go.mod h1:P8mMBGCZbIMVe08iw0afFBMl1QM3aJk+MU//Q2Z5rJc=
go.arcalot.io/exex v0.2.0 h1:u44pjwPwcH57TF8knhaqVZP/1V/KbnRe//pKzMwDpLw=
go.arcalot.io/exex v0.2.0/go.mod h1:5zlFr+7vOQNZKYCNOEDdsad+z/dlvXKs2v4kG+v+bQo=
go.arcalot.io/lang v1.1.0 h1:ugglRKpd3qIMkdghAjKJxsziIgHm8QpxrzZPSXoa08I=
Expand All @@ -135,8 +135,6 @@ go.flow.arcalot.io/deployer v0.6.1 h1:Q65VHeRZzdrMJZqTRb26EQZQbK+C3pORETVlpw02xW
go.flow.arcalot.io/deployer v0.6.1/go.mod h1:Oh+71KYQEof6IS3UGhpMyjQQPRcuomUccn7fwAqrPxE=
go.flow.arcalot.io/dockerdeployer v0.7.3 h1:CLvSdqfoE8oZADI0wfry46SXR4CQjB6Qh+6Ym70zheQ=
go.flow.arcalot.io/dockerdeployer v0.7.3/go.mod h1:YWw9+GbYJxEnlahlYCx4UOJe+QNkecf8+EBtSIQD0aE=
go.flow.arcalot.io/expressions v0.4.3 h1:0BRRghutHp0sctsITHe/A1le0yYiJtKNTxm27T+P6Og=
go.flow.arcalot.io/expressions v0.4.3/go.mod h1:UORX78N4ep71wOzNXdIo/UY+6SdDD0id0mvuRNEQMeM=
go.flow.arcalot.io/expressions v0.4.4 h1:bYTC7YDmgDWcsdyY41+IvTJbvsM1rdE3ZBJhB+jNPHQ=
go.flow.arcalot.io/expressions v0.4.4/go.mod h1:0Y2LgynO1SWA4bqsnKlCxqLME9zOR8tWKg3g+RG+FFQ=
go.flow.arcalot.io/kubernetesdeployer v0.9.3 h1:XKiqmCqXb6ZLwP5IQTAKS/gJHpq0Ub/yEjCfgAwQF2A=
Expand Down
9 changes: 7 additions & 2 deletions internal/infer/infer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func Type(
return nil, fmt.Errorf("failed to evaluate type of expression %s (%w)", expr.String(), err)
}
return oneOfType, nil
case *OptionalExpression:
return Type(expr.Expr, internalDataModel, functions, workflowContext)
}

v := reflect.ValueOf(data)
Expand Down Expand Up @@ -198,14 +200,17 @@ func objectType(
) (schema.Type, error) {
properties := make(map[string]*schema.PropertySchema, value.Len())
for _, keyValue := range value.MapKeys() {
propertyType, err := Type(value.MapIndex(keyValue).Interface(), internalDataModel, functions, workflowContext)
inferredValue := value.MapIndex(keyValue).Interface()
propertyType, err := Type(inferredValue, internalDataModel, functions, workflowContext)
if err != nil {
return nil, fmt.Errorf("failed to infer property %s type (%w)", keyValue.Interface(), err)
}
_, isOptionalExpr := inferredValue.(*OptionalExpression)

properties[keyValue.Interface().(string)] = schema.NewPropertySchema(
propertyType,
nil,
true,
!isOptionalExpr,
nil,
nil,
nil,
Expand Down
13 changes: 13 additions & 0 deletions internal/infer/optional_expression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package infer

import (
"go.flow.arcalot.io/expressions"
)

// OptionalExpression is an expression that can be used in an object as an optional field.
type OptionalExpression struct {
Expr expressions.Expression
WaitForCompletion bool
GroupNodePath string
ParentNodePath string
}
2 changes: 1 addition & 1 deletion workflow/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (a *anySchemaWithExpressions) Serialize(data any) (any, error) {

func (a *anySchemaWithExpressions) checkAndConvert(data any) (any, error) {
switch data.(type) {
case expressions.Expression, infer.OneOfExpression, *infer.OneOfExpression:
case expressions.Expression, infer.OneOfExpression, *infer.OneOfExpression, *infer.OptionalExpression:
return data, nil
}
t := reflect.ValueOf(data)
Expand Down
71 changes: 59 additions & 12 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,13 @@ func (e *executor) createTypeStructure(rootSchema schema.Scope, inputField any,
return inputField.Type(rootSchema, e.callableFunctionSchemas, workflowContext)
case *infer.OneOfExpression:
return inputField.Type(rootSchema, e.callableFunctionSchemas, workflowContext)
case *infer.OptionalExpression:
return e.createTypeStructure(rootSchema, inputField.Expr, workflowContext)
}

v := reflect.ValueOf(inputField)
switch v.Kind() {
case reflect.Slice:
// Okay. Construct the list of schemas, and pass it into the

result := make([]any, v.Len())
for i := 0; i < v.Len(); i++ {
value := v.Index(i).Interface()
Expand Down Expand Up @@ -917,6 +917,8 @@ func (e *executor) prepareDependencies(
return e.prepareExprDependencies(s, workflowContext, currentNode, outputSchema, dag)
case *infer.OneOfExpression:
return e.prepareOneOfExprDependencies(s, workflowContext, currentNode, pathInCurrentNode, outputSchema, dag)
case *infer.OptionalExpression:
return e.prepareOptionalExprDependencies(s, workflowContext, currentNode, pathInCurrentNode, outputSchema, dag)
default:
return &ErrInvalidWorkflow{fmt.Errorf("unsupported struct/pointer type in workflow input: %T", stepData)}
}
Expand Down Expand Up @@ -1006,6 +1008,55 @@ func (e *executor) prepareExprDependencies(
return nil
}

func (e *executor) createGroupNode(
currentNode dgraph.Node[*DAGItem],
pathInCurrentNode []string,
dag dgraph.DirectedGraph[*DAGItem],
dependencyType dgraph.DependencyType,
) (dgraph.Node[*DAGItem], error) {
groupNodeType := &DAGItem{
Kind: DagItemKindDependencyGroup,
}
groupedDagNode, err := dag.AddNode(
currentNode.ID()+"."+strings.Join(pathInCurrentNode, "."), groupNodeType)
if err != nil {
return nil, err
}
err = currentNode.ConnectDependency(groupedDagNode.ID(), dependencyType)
if err != nil {
return nil, err
}
return groupedDagNode, nil
}

func (e *executor) prepareOptionalExprDependencies(
expr *infer.OptionalExpression,
workflowContext map[string][]byte,
currentNode dgraph.Node[*DAGItem],
pathInCurrentNode []string,
outputSchema *schema.ScopeSchema,
dag dgraph.DirectedGraph[*DAGItem],
) error {
var dependencyType dgraph.DependencyType
if expr.WaitForCompletion {
dependencyType = dgraph.CompletionAndDependency
} else {
dependencyType = dgraph.OptionalDependency
}

// Creates a new group node to isolate the optional dependencies.
// The current node will depend on the group node with the dependency type set in `dependencyType`.
optionalDagNode, err := e.createGroupNode(currentNode, pathInCurrentNode, dag, dependencyType)
if err != nil {
return err
}

expr.GroupNodePath = optionalDagNode.ID()
expr.ParentNodePath = currentNode.ID()

return e.prepareExprDependencies(expr.Expr, workflowContext, optionalDagNode, outputSchema, dag)
}

func (e *executor) prepareOneOfExprDependencies(
expr *infer.OneOfExpression,
workflowContext map[string][]byte,
Expand All @@ -1020,25 +1071,21 @@ func (e *executor) prepareOneOfExprDependencies(
if len(expr.Options) == 0 {
return fmt.Errorf("oneof %s has no options", expr.String())
}
// In case there are multiple OneOfs, each oneof needs its own node.
orNodeType := &DAGItem{
Kind: DagItemKindOrGroup,
}
oneofDagNode, err := dag.AddNode(
currentNode.ID()+"."+strings.Join(pathInCurrentNode, "."), orNodeType)
if err != nil {
return err
groupNodeType := &DAGItem{
Kind: DagItemKindDependencyGroup,
}
err = currentNode.ConnectDependency(oneofDagNode.ID(), dgraph.AndDependency)
// In case there are multiple OneOfs, each oneof needs its own node.
oneofDagNode, err := e.createGroupNode(currentNode, pathInCurrentNode, dag, dgraph.AndDependency)
if err != nil {
return err
}

// Mark the node ID on the OneOfExpression. This mutates the expression, so make sure
// this is not operating on a copy of the schema for the data to be retained.
expr.NodePath = oneofDagNode.ID()
for optionID, optionData := range expr.Options {
optionDagNode, err := dag.AddNode(
oneofDagNode.ID()+"."+optionID, orNodeType)
oneofDagNode.ID()+"."+optionID, groupNodeType)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions workflow/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ const (
DAGItemKindStepStageOutput DAGItemKind = "stepStageOutput"
// DAGItemKindOutput indicates a DAG node for the workflow output.
DAGItemKindOutput DAGItemKind = "output"
// DagItemKindOrGroup indicates a DAG node used to complete a part of
// an input or output that needs dependencies grouped, typically for OR dependencies.
DagItemKindOrGroup DAGItemKind = "orGroup"
// DagItemKindDependencyGroup indicates a DAG node used to complete a part of
// an input or output that needs dependencies grouped, typically for OR dependencies
// or optional dependencies.
DagItemKindDependencyGroup DAGItemKind = "dependencyGroup"
)

// DAGItem is the internal structure of the DAG.
Expand Down
32 changes: 30 additions & 2 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (l *loopState) notifySteps() { //nolint:gocognit
inputData := nodeItem.Data
if inputData == nil {
switch nodeItem.Kind {
case DagItemKindOrGroup:
case DagItemKindDependencyGroup:
if err := node.ResolveNode(dgraph.Resolved); err != nil {
panic(fmt.Errorf("error occurred while resolving workflow OR group node (%s)", err.Error()))
}
Expand Down Expand Up @@ -688,6 +688,8 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error
return expr.Evaluate(dataModel, l.callableFunctions, l.workflowContext)
case *infer.OneOfExpression:
return l.resolveOneOfExpression(expr, dataModel)
case *infer.OptionalExpression:
return l.resolveOptionalExpression(expr, dataModel)
}

v := reflect.ValueOf(inputData)
Expand All @@ -712,7 +714,9 @@ func (l *loopState) resolveExpressions(inputData any, dataModel any) (any, error
if err != nil {
return nil, fmt.Errorf("failed to resolve workflow map expressions (%w)", err)
}
result[key] = newValue
if newValue != nil { // In case it's an optional field.
result[key] = newValue
}
}
return result, nil
default:
Expand All @@ -725,6 +729,9 @@ func (l *loopState) resolveOneOfExpression(expr *infer.OneOfExpression, dataMode

// Get the node the OneOf uses to check which Or dependency resolved first (the others will either not be
// in the resolved list, or they will be obviated)
if expr.NodePath == "" {
return nil, fmt.Errorf("node path is empty in oneof expression %s", expr.String())
}
oneOfNode, err := l.dag.GetNodeByID(expr.NodePath)
if err != nil {
return nil, fmt.Errorf("failed to get node to resolve oneof expression (%w)", err)
Expand Down Expand Up @@ -777,6 +784,27 @@ func (l *loopState) resolveOneOfExpression(expr *infer.OneOfExpression, dataMode
return outputData, nil
}

func (l *loopState) resolveOptionalExpression(expr *infer.OptionalExpression, dataModel any) (any, error) {
l.logger.Debugf("Evaluating oneof expression %s...", expr.Expr.String())
if expr.ParentNodePath == "" {
return nil, fmt.Errorf("ParentNodePath is empty in resolve optional expression %s", expr.Expr.String())
}
if expr.GroupNodePath == "" {
return nil, fmt.Errorf("GroupNodePath is empty in resolve optional expression %s", expr.Expr.String())
}
// Check to see if the group node is resolved within the parent node
parentDagNode, err := l.dag.GetNodeByID(expr.ParentNodePath)
if err != nil {
return nil, fmt.Errorf("failed to get parent node to resolve optional expression (%w)", err)
}
resolvedDependencies := parentDagNode.ResolvedDependencies()
_, dependencyGroupResolved := resolvedDependencies[expr.GroupNodePath]
if !dependencyGroupResolved {
return nil, nil // It's nil to indicate that the optional field is not present.
}
return expr.Expr.Evaluate(dataModel, l.callableFunctions, l.workflowContext)
}

// stageChangeHandler is implementing step.StageChangeHandler.
type stageChangeHandler struct {
onStageChange func(
Expand Down
Loading

0 comments on commit 62797d2

Please sign in to comment.