Skip to content

Commit

Permalink
only unserialize input to parse validity (#121)
Browse files Browse the repository at this point in the history
* only unserialize input to parse validity

* unserialize less please

* refactor workflow interface and schema keys

* fix style

* fix lint again
  • Loading branch information
mfleader authored Nov 6, 2023
1 parent ebc95a5 commit bd5866c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
go.flow.arcalot.io/dockerdeployer v0.5.0
go.flow.arcalot.io/expressions v0.2.1
go.flow.arcalot.io/kubernetesdeployer v0.8.0
go.flow.arcalot.io/pluginsdk v0.5.0
go.flow.arcalot.io/pluginsdk v0.5.1
go.flow.arcalot.io/podmandeployer v0.6.1
go.flow.arcalot.io/pythondeployer v0.4.0
go.flow.arcalot.io/testdeployer v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ go.flow.arcalot.io/expressions v0.2.1 h1:TAAbDrgJJLpmgA5ASyP/KzrXWtpEaQ8JsCPHgpe
go.flow.arcalot.io/expressions v0.2.1/go.mod h1:Vw1ScNu4Uyw1/l87LAH8jxe0DyRWwMh+rlfB/BPYDOU=
go.flow.arcalot.io/kubernetesdeployer v0.8.0 h1:UjH/aspPif/k+X65sLWlNDZAW5JlzUfgOnLHOrhxEQk=
go.flow.arcalot.io/kubernetesdeployer v0.8.0/go.mod h1:BhERhKpvQMJkrcW9lbBF4kJEe+OGhz2NpSftZIgtVNQ=
go.flow.arcalot.io/pluginsdk v0.5.0 h1:TRS/waCTcdoMZ9neDAcfy3zpzyDnPHRbhV+Y1kpcw3Y=
go.flow.arcalot.io/pluginsdk v0.5.0/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM=
go.flow.arcalot.io/pluginsdk v0.5.1 h1:ebb2ThAqmjmwGpDyKpd1wEDUisPqPabgARjFohy47Io=
go.flow.arcalot.io/pluginsdk v0.5.1/go.mod h1:2s2f//7uOkBjr1QaiWJD/bqDIeLlINJtD1BhiY4aGPM=
go.flow.arcalot.io/podmandeployer v0.6.1 h1:NPSQ82PyvxTRDsHuLj6oBhPUlhVVqPlnXWkElF2PcT8=
go.flow.arcalot.io/podmandeployer v0.6.1/go.mod h1:4wfcl0qjV02y64We3ZSDz+3lwdOfbe+gpFjm7SQKTRA=
go.flow.arcalot.io/pythondeployer v0.4.0 h1:l8nw6awYMVzgND+ZXdbnNJPYu3V0sgSUFsIzn+SRgh0=
Expand Down
5 changes: 1 addition & 4 deletions workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ type ExecutableWorkflow interface {
// must only contain primitives (float, int, bool, string, map, slice) and may not contain structs and other
// elements. The output will consist of the output ID, the returned output data corresponding to the output IDs
// schema, or if an error happened, the error.
Execute(
ctx context.Context,
input any,
) (outputID string, outputData any, err error)
Execute(ctx context.Context, serializedInput any) (outputID string, outputData any, err error)

// OutputSchema returns the schema for the possible outputs of this workflow.
OutputSchema() map[string]*schema.StepOutputSchema
Expand Down
26 changes: 17 additions & 9 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"go.flow.arcalot.io/pluginsdk/schema"
)

const (
// WorkflowInputKey is the key in the workflow map for input.
WorkflowInputKey = "input"
// WorkflowStepsKey is the key in the workflow map for the steps.
WorkflowStepsKey = "steps"
)

// executableWorkflow is an implementation of the ExecutableWorkflow interface that provides a workflow you can actually
// run.
type executableWorkflow struct {
Expand Down Expand Up @@ -48,9 +55,10 @@ func (e *executableWorkflow) DAG() dgraph.DirectedGraph[*DAGItem] {

// Execute runs the workflow with the specified input. You can use the context variable to abort the workflow execution
// (e.g. when the user presses Ctrl+C).
func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID string, outputData any, err error) { //nolint:gocognit
func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) (outputID string, outputData any, err error) { //nolint:gocognit
// First, we unserialize the input. This makes sure we didn't get garbage data.
unserializedInput, err := e.input.Unserialize(input)

_, err = e.input.Unserialize(serializedInput)
if err != nil {
return "", nil, fmt.Errorf("invalid workflow input (%w)", err)
}
Expand All @@ -64,8 +72,8 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s
config: e.config,
lock: &sync.Mutex{},
data: map[string]any{
"input": unserializedInput,
"steps": map[string]any{},
WorkflowInputKey: serializedInput,
WorkflowStepsKey: map[string]any{},
},
dag: e.dag.Clone(),
inputsNotified: make(map[string]struct{}, len(e.dag.ListNodes())),
Expand All @@ -86,14 +94,14 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s
runnableStep := runnableStep
stepDataModel := map[string]any{}
for _, stage := range e.lifecycles[stepID].Stages {
steps := l.data["steps"].(map[string]any)
steps := l.data[WorkflowStepsKey].(map[string]any)
if _, ok := steps[stepID]; !ok {
steps[stepID] = map[string]any{}
}
stages := steps[stepID].(map[string]any)
stages[stage.ID] = map[string]any{}
}
l.data["steps"].(map[string]any)[stepID] = stepDataModel
l.data[WorkflowStepsKey].(map[string]any)[stepID] = stepDataModel

var stageHandler step.StageChangeHandler = &stageChangeHandler{
onStageChange: func(
Expand Down Expand Up @@ -150,7 +158,7 @@ func (e *executableWorkflow) Execute(ctx context.Context, input any) (outputID s
// We remove the input node from the DAG and call the notifySteps function once to trigger the workflow
// start.
e.logger.Debugf("Starting workflow execution...\n%s", l.dag.Mermaid())
inputNode, err := l.dag.GetNodeByID("input")
inputNode, err := l.dag.GetNodeByID(WorkflowInputKey)
if err != nil {
return "", nil, fmt.Errorf("bug: cannot obtain input node (%w)", err)
}
Expand Down Expand Up @@ -301,8 +309,8 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo
}

// Placing data from the output into the general data structure
l.data["steps"].(map[string]any)[stepID].(map[string]any)[*previousStage] = map[string]any{}
l.data["steps"].(map[string]any)[stepID].(map[string]any)[*previousStage].(map[string]any)[*previousStageOutputID] = *previousStageOutput
l.data[WorkflowStepsKey].(map[string]any)[stepID].(map[string]any)[*previousStage] = map[string]any{}
l.data[WorkflowStepsKey].(map[string]any)[stepID].(map[string]any)[*previousStage].(map[string]any)[*previousStageOutputID] = *previousStageOutput
}
l.notifySteps()
}
Expand Down

0 comments on commit bd5866c

Please sign in to comment.