Skip to content

Commit

Permalink
pkg/workflows/sdk: add WorkflowSpecFactory.BeginSerial/BeginAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 1, 2024
1 parent fefe57c commit afa5ef4
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 5 deletions.
38 changes: 37 additions & 1 deletion pkg/workflows/sdk/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,22 @@ type WorkflowSpecFactory struct {
emptyNames bool
badCapTypes []string
fns map[string]func(runtime Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)
serialMode bool
prevRefs []string
}

func (w *WorkflowSpecFactory) GetFn(name string) func(sdk Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return w.fns[name]
}

func (w *WorkflowSpecFactory) BeginSerial() {
w.serialMode = true
}

func (w *WorkflowSpecFactory) BeginAsync() {
w.serialMode = false
}

type CapDefinition[O any] interface {
Ref() any
self() CapDefinition[O]
Expand Down Expand Up @@ -98,6 +108,15 @@ type NewWorkflowParams struct {
Name string
}

// NewSerialWorkflowSpecFactory returns a new WorkflowSpecFactory in Serial mode.
// This is the same as calling NewWorkflowSpecFactory then WorkflowSpecFactory.BeginSerial.
func NewSerialWorkflowSpecFactory(params NewWorkflowParams) *WorkflowSpecFactory {
f := NewWorkflowSpecFactory(params)
f.BeginSerial()
return f
}

// NewWorkflowSpecFactory returns a new NewWorkflowSpecFactory.
func NewWorkflowSpecFactory(
params NewWorkflowParams,
) *WorkflowSpecFactory {
Expand All @@ -119,6 +138,16 @@ func NewWorkflowSpecFactory(
// AddTo is meant to be called by generated code
func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
stepDefinition := step.Definition

if w.serialMode {
// ensure we depend on each previous step
for _, prevRef := range w.prevRefs {
if !stepDefinition.Inputs.HasRef(prevRef) {
stepDefinition.Condition = fmt.Sprintf("$(%s.success)", prevRef)
}
}
}

stepRef := stepDefinition.Ref
if w.names[stepRef] && stepDefinition.CapabilityType != capabilities.CapabilityTypeTarget {
w.duplicateNames[stepRef] = true
Expand All @@ -143,7 +172,14 @@ func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
w.badCapTypes = append(w.badCapTypes, stepDefinition.ID)
}

return &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}
c := &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}

if w.serialMode {
w.prevRefs = []string{step.Definition.Ref}
} else {
w.prevRefs = append(w.prevRefs, step.Definition.Ref)
}
return c
}

// AccessField is meant to be used by generated code
Expand Down
28 changes: 28 additions & 0 deletions pkg/workflows/sdk/testdata/fixtures/charts/builder_serial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
```mermaid
flowchart
trigger[\"<b>trigger</b><br>trigger<br><i>(basic-test-trigger[at]1.0.0)</i>"/]
compute["<b>compute</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-bar -- Value --> compute
get-baz -- Value --> compute
get-foo -- Value --> compute
get-bar["<b>get-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-foo -..-> get-bar
trigger -- cool_output --> get-bar
get-baz["<b>get-baz</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-bar -..-> get-baz
trigger -- cool_output --> get-baz
get-foo["<b>get-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
trigger -- cool_output --> get-foo
consensus[["<b>consensus</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
compute -- Value --> consensus
unnamed6[/"target<br><i>(id)</i>"\]
consensus --> unnamed6
```
13 changes: 9 additions & 4 deletions pkg/workflows/sdk/workflow_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ func (os outputs) addOutput(s string) {
//
// Within the workflow spec, they are called "Capability Properties".
type StepDefinition struct {
ID string
Ref string
Inputs StepInputs
Config map[string]any
ID string
Ref string
Condition string
Inputs StepInputs
Config map[string]any

CapabilityType capabilities.CapabilityType
}
Expand Down Expand Up @@ -193,6 +194,10 @@ var tmpl = template.Must(template.New("").Funcs(map[string]any{
{{ else -}}
{{ $ref }}["{{$name}}"]
{{ end -}}
{{ $condRef := parseRef .Condition -}}
{{ if $condRef -}}
{{ $condRef }} -..-> {{ $step.Ref }}
{{ end -}}
{{ if .Inputs.OutputRef -}}
{{ .Inputs.OutputRef }} --> {{ $step.Ref }}
{{ else -}}
Expand Down
87 changes: 87 additions & 0 deletions pkg/workflows/sdk/workflow_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ func TestWorkflowSpecFormatChart(t *testing.T) {
{"notstreamssepolia", notStreamSepoliaWorkflowSpec},
{"serial", serialWorkflowSpec},
{"parallel", parallelWorkflowSpec},
{"parallel_serialized", parallelSerializedWorkflowSpec},
{"builder_parallel", buildSimpleWorkflowSpec(
sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "test", Name: "parallel"}),
).MustSpec(t)},
{"builder_serial", buildSimpleWorkflowSpec(
sdk.NewSerialWorkflowSpecFactory(sdk.NewWorkflowParams{Owner: "test", Name: "serial"}),
).MustSpec(t)},
} {
t.Run(tt.name, func(t *testing.T) {
requireEqualChart(t, tt.name, tt.workflow)
Expand Down Expand Up @@ -386,3 +390,86 @@ var parallelWorkflowSpec = sdk.WorkflowSpec{
},
},
}

var parallelSerializedWorkflowSpec = sdk.WorkflowSpec{
Name: "parallel-serialized",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "trigger-chain-event",
Inputs: sdk.StepInputs{},
Config: map[string]any{"maxFrequencyMs": 5000},
CapabilityType: capabilities.CapabilityTypeTrigger,
},
},
Actions: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "get-foo",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeAction,
},
{
ID: "[email protected]",
Ref: "compute-foo",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(get-foo.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeAction,
},
{
ID: "[email protected]",
Ref: "get-bar",
Condition: "$(compute-foo.success)",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeAction,
},
{
ID: "[email protected]",
Ref: "compute-bar",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(get-bar.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeAction,
},
{
ID: "[email protected]",
Ref: "read-token-price",
Condition: "$(compute-bar.success)",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(trigger-chain-event.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeAction,
},
},
Consensus: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "data-feeds-report",
Inputs: sdk.StepInputs{
Mapping: map[string]any{
"observations": []string{
"$(compute-foo.outputs.Value)",
"$(compute-bar.outputs.Value)",
},
"token_price": "$(read-token-price.outputs.Value)",
},
},
CapabilityType: capabilities.CapabilityTypeConsensus,
},
},
Targets: []sdk.StepDefinition{
{
ID: "[email protected]",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
},
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{
"ID": "[email protected]",
"Ref": "report_data",
"Condition": "",
"Inputs": {
"OutputRef": "",
"Mapping": null
Expand All @@ -18,6 +19,7 @@
{
"ID": "trigger_test:aaShouldBeFirst_true:chain_ethereum:[email protected]",
"Ref": "",
"Condition": "",
"Inputs": {
"OutputRef": "",
"Mapping": {
Expand All @@ -34,6 +36,7 @@
{
"ID": "[email protected]",
"Ref": "",
"Condition": "",
"Inputs": {
"OutputRef": "",
"Mapping": {
Expand Down

0 comments on commit afa5ef4

Please sign in to comment.