Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/workflows/sdk: add WorkflowSpecFactory.BeginSerial/BeginAsync #821

Draft
wants to merge 2 commits into
base: workflowchart
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ lint-workspace:

lint:
@./script/lint.sh $(GOLANGCI_LINT_VERSION) "$(GOLANGCI_LINT_COMMON_OPTS)" $(GOLANGCI_LINT_DIRECTORY) "--new-from-rev=origin/main"

.PHONY: test-quiet
test-quiet:
go test ./... | grep -v "\[no test files\]" | grep -v "\(cached\)"
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/smartcontractkit/chainlink-common

go 1.22.0

toolchain go1.22.7
go 1.23

require (
github.com/andybalholm/brotli v1.1.0
Expand Down
21 changes: 21 additions & 0 deletions pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package capabilities

import (
"cmp"
"context"
"fmt"
"regexp"
Expand Down Expand Up @@ -53,6 +54,26 @@ func (c CapabilityType) IsValid() error {
return fmt.Errorf("invalid capability type: %s", c)
}

func (c CapabilityType) cmpOrder() int {
switch c {
case CapabilityTypeTrigger:
return 0
case CapabilityTypeAction:
return 1
case CapabilityTypeConsensus:
return 2
case CapabilityTypeTarget:
return 3
case CapabilityTypeUnknown:
return 4
default:
return 5
}
}
func (c CapabilityType) Compare(c2 CapabilityType) int {
return cmp.Compare(c.cmpOrder(), c2.cmpOrder())
}

// CapabilityResponse is a struct for the Execute response of a capability.
type CapabilityResponse struct {
Value *values.Map
Expand Down
2 changes: 1 addition & 1 deletion pkg/workflows/models_yaml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var transformJSON = cmp.FilterValues(func(x, y []byte) bool {
return out
}))

func TestWorkflowSpecMarshalling(t *testing.T) {
func TestWorkflowSpecYamlMarshalling(t *testing.T) {
t.Parallel()
fixtureReader := yamlFixtureReaderBytes(t, "marshalling")

Expand Down
38 changes: 37 additions & 1 deletion pkg/workflows/sdk/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,22 @@ type WorkflowSpecFactory struct {
emptyNames bool
badCapTypes []string
fns map[string]func(runtime Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)
serialMode bool
prevRefs []string
}

func (w *WorkflowSpecFactory) GetFn(name string) func(sdk Runtime, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
return w.fns[name]
}

func (w *WorkflowSpecFactory) BeginSerial() {
w.serialMode = true
}

func (w *WorkflowSpecFactory) BeginAsync() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: serial and parallel pair better, or concurrent. All the steps are actually async under the hood.

Copy link
Collaborator Author

@jmank88 jmank88 Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would EndSerial() make sense?
Edit: Or just SetSerial(bool) in place of both?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking forward about how to generalize this a bit might help too. Right now we have one global mode, but what if we want to build sub-graphs within one workflow, and control the mode of each subgraph as we construct it? I think it will be important for the API to be consistent throughout, and one test would be whether we can write helper funcs that are able to add steps to a workflow or a sub-workflow, without any special logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree that subgraphs are important generalization. subgraphs and phases/checkpoints that we discussed yesterday are similar concepts. i think you can implement phases with an ordered list of subgraphs such that the previous element in the list must complete before starting the given subgraph

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes me think of another error scenario - when a step fails, and so we do not execute the subsequent dependent steps, must we still propagate failure through them? i.e. if we have A --> B -..-> C, where B uses data from A and C runs only if B fails, then what happens if A fails? Do we skip B (and C)? Or do we propagate the failure through B, triggering C? I think users will want to express the latter, in terms of "if any part of the subgraph fails, run this alternate subgraph"

w.serialMode = false
}

type CapDefinition[O any] interface {
capDefinition
self() CapDefinition[O]
Expand Down Expand Up @@ -107,6 +117,15 @@ type NewWorkflowParams struct {
Name string
}

// NewSerialWorkflowSpecFactory returns a new WorkflowSpecFactory in Serial mode.
// This is the same as calling NewWorkflowSpecFactory then WorkflowSpecFactory.BeginSerial.
func NewSerialWorkflowSpecFactory(params NewWorkflowParams) *WorkflowSpecFactory {
f := NewWorkflowSpecFactory(params)
f.BeginSerial()
return f
}
Comment on lines +122 to +126
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it just be a param? We can default it either way right now because we don't have any actual WASM workflows yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a param is necessary because it is trivial to call the method instead. I only added this as a convenience to cater to the "serial only" user experience, but would gladly remove it.


// NewWorkflowSpecFactory returns a new NewWorkflowSpecFactory.
func NewWorkflowSpecFactory(
params NewWorkflowParams,
) *WorkflowSpecFactory {
Expand All @@ -128,6 +147,16 @@ func NewWorkflowSpecFactory(
// AddTo is meant to be called by generated code
func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
stepDefinition := step.Definition

if w.serialMode {
// ensure we depend on each previous step
for _, prevRef := range w.prevRefs {
if !stepDefinition.Inputs.HasRef(prevRef) {
stepDefinition.Condition = fmt.Sprintf("$(%s.success)", prevRef)
}
}
}

stepRef := stepDefinition.Ref
if w.names[stepRef] && stepDefinition.CapabilityType != capabilities.CapabilityTypeTarget {
w.duplicateNames[stepRef] = true
Expand All @@ -152,7 +181,14 @@ func (step *Step[O]) AddTo(w *WorkflowSpecFactory) CapDefinition[O] {
w.badCapTypes = append(w.badCapTypes, stepDefinition.ID)
}

return &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}
c := &capDefinitionImpl[O]{ref: fmt.Sprintf("$(%s.outputs)", step.Definition.Ref)}

if w.serialMode {
w.prevRefs = []string{step.Definition.Ref}
} else {
w.prevRefs = append(w.prevRefs, step.Definition.Ref)
}
return c
}

// AccessField is meant to be used by generated code
Expand Down
77 changes: 1 addition & 76 deletions pkg/workflows/sdk/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
ocr3 "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/ocr3cap"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/targets/chainwriter"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams"
Expand Down Expand Up @@ -205,81 +204,7 @@ func TestBuilder_ValidSpec(t *testing.T) {
actual, err := factory.Spec()
require.NoError(t, err)

expected := sdk.WorkflowSpec{
Name: "notccipethsep",
Owner: "0x00000000000000000000000000000000000000aa",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "trigger",
Inputs: sdk.StepInputs{},
Config: map[string]any{"maxFrequencyMs": 5000},
CapabilityType: capabilities.CapabilityTypeTrigger,
},
},
Actions: make([]sdk.StepDefinition, 0),
Consensus: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "data-feeds-report",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"observations": []map[string]any{
{
"Metadata": map[string]any{
"MinRequiredSignatures": 1,
"Signers": []string{"$(trigger.outputs.Metadata.Signer)"},
},
"Payload": []map[string]any{
{
"BenchmarkPrice": "$(trigger.outputs.Payload.BuyPrice)",
"FeedID": anyFakeFeedID,
"FullReport": "$(trigger.outputs.Payload.FullReport)",
"ObservationTimestamp": "$(trigger.outputs.Payload.ObservationTimestamp)",
"ReportContext": "$(trigger.outputs.Payload.ReportContext)",
"Signatures": []string{"$(trigger.outputs.Payload.Signature)"},
},
},
"Timestamp": "$(trigger.outputs.Timestamp)",
},
}},
},
Config: map[string]any{
"aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{
AllowedPartialStaleness: "0.5",
Feeds: map[string]ocr3.FeedValue{
anyFakeFeedID: {
Deviation: "0.5",
Heartbeat: 3600,
},
},
},
"aggregation_method": "data_feeds",
"encoder": "EVM",
"encoder_config": ocr3.EncoderConfig{
"Abi": "(bytes32 FeedID, uint224 Price, uint32 Timestamp)[] Reports",
},
"report_id": "0001",
},
CapabilityType: capabilities.CapabilityTypeConsensus,
},
},
Targets: []sdk.StepDefinition{
{
ID: "[email protected]",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
},
Config: map[string]any{
"address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416",
"deltaStage": "45s",
"schedule": "oneAtATime",
},
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
}

testutils.AssertWorkflowSpec(t, expected, actual)
testutils.AssertWorkflowSpec(t, notStreamSepoliaWorkflowSpec, actual)
})

t.Run("duplicate names causes errors", func(t *testing.T) {
Expand Down
70 changes: 2 additions & 68 deletions pkg/workflows/sdk/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,74 +39,8 @@ func TestCompute(t *testing.T) {

spec, err2 := workflow.Spec()
require.NoError(t, err2)
expectedSpec := sdk.WorkflowSpec{
Name: "name",
Owner: "owner",
Triggers: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "trigger",
Inputs: sdk.StepInputs{},
Config: map[string]any{"maxFrequencyMs": 5000},
CapabilityType: capabilities.CapabilityTypeTrigger,
},
},
Actions: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "Compute",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"Arg0": "$(trigger.outputs)"},
},
Config: map[string]any{
"binary": "$(ENV.binary)",
"config": "$(ENV.config)",
},
CapabilityType: capabilities.CapabilityTypeAction,
},
},
Consensus: []sdk.StepDefinition{
{
ID: "[email protected]",
Ref: "data-feeds-report",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"observations": "$(Compute.outputs.Value)"},
},
Config: map[string]any{
"aggregation_config": ocr3.DataFeedsConsensusConfigAggregationConfig{
AllowedPartialStaleness: "false",
Feeds: map[string]ocr3.FeedValue{
anyFakeFeedID: {
Deviation: "0.5",
Heartbeat: 3600,
},
},
},
"aggregation_method": "data_feeds",
"encoder": ocr3.EncoderEVM,
"encoder_config": ocr3.EncoderConfig{},
"report_id": "0001",
},
CapabilityType: capabilities.CapabilityTypeConsensus,
},
},
Targets: []sdk.StepDefinition{
{
ID: "[email protected]",
Inputs: sdk.StepInputs{
Mapping: map[string]any{"signed_report": "$(data-feeds-report.outputs)"},
},
Config: map[string]any{
"address": "0xE0082363396985ae2FdcC3a9F816A586Eed88416",
"deltaStage": "45s",
"schedule": "oneAtATime",
},
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
}

testutils.AssertWorkflowSpec(t, expectedSpec, spec)
testutils.AssertWorkflowSpec(t, serialWorkflowSpec, spec)
})

t.Run("compute runs the function and returns the value", func(t *testing.T) {
Expand All @@ -133,7 +67,7 @@ func TestCompute(t *testing.T) {
func createWorkflow(fn func(_ sdk.Runtime, inputFeed notstreams.Feed) ([]streams.Feed, error)) *sdk.WorkflowSpecFactory {
workflow := sdk.NewWorkflowSpecFactory(sdk.NewWorkflowParams{
Owner: "owner",
Name: "name",
Name: "serial",
})

trigger := notstreams.TriggerConfig{MaxFrequencyMs: 5000}.New(workflow)
Expand Down
14 changes: 14 additions & 0 deletions pkg/workflows/sdk/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sdk

import (
"testing"

"github.com/stretchr/testify/require"
)

func (w *WorkflowSpecFactory) MustSpec(t *testing.T) WorkflowSpec {
t.Helper()
s, err := w.Spec()
require.NoError(t, err)
return s
}
8 changes: 8 additions & 0 deletions pkg/workflows/sdk/testdata/fixtures/charts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# WorkflowSpec Charts

This directory contains WorkflowSpec chart golden files. They are validated against test data by TestWorkflowSpecFormatChart,
and can be regenerated by passing the `-update` flag:
```sh
go test -run=TestWorkflowSpecFormatChart ./pkg/workflows/sdk/ -update
```
You can also invoke go:generate on package sdk, which will do the same.
26 changes: 26 additions & 0 deletions pkg/workflows/sdk/testdata/fixtures/charts/builder_parallel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
```mermaid
flowchart

trigger[\"<b>trigger</b><br>trigger<br><i>(basic-test-trigger[at]1.0.0)</i>"/]

compute["<b>compute</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-bar -- Value --> compute
get-baz -- Value --> compute
get-foo -- Value --> compute

get-bar["<b>get-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
trigger -- cool_output --> get-bar

get-baz["<b>get-baz</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
trigger -- cool_output --> get-baz

get-foo["<b>get-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
trigger -- cool_output --> get-foo

consensus[["<b>consensus</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
compute -- Value --> consensus

unnamed6[/"target<br><i>(id)</i>"\]
consensus --> unnamed6

```
28 changes: 28 additions & 0 deletions pkg/workflows/sdk/testdata/fixtures/charts/builder_serial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
```mermaid
flowchart

trigger[\"<b>trigger</b><br>trigger<br><i>(basic-test-trigger[at]1.0.0)</i>"/]

compute["<b>compute</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-bar -- Value --> compute
get-baz -- Value --> compute
get-foo -- Value --> compute

get-bar["<b>get-bar</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-foo -..-> get-bar
trigger -- cool_output --> get-bar

get-baz["<b>get-baz</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
get-bar -..-> get-baz
trigger -- cool_output --> get-baz

get-foo["<b>get-foo</b><br>action<br><i>(custom_compute[at]1.0.0)</i>"]
trigger -- cool_output --> get-foo

consensus[["<b>consensus</b><br>consensus<br><i>(offchain_reporting[at]1.0.0)</i>"]]
compute -- Value --> consensus

unnamed6[/"target<br><i>(id)</i>"\]
consensus --> unnamed6

```
Loading
Loading