Skip to content

Commit

Permalink
Delegate workflow spec creation (#14365)
Browse files Browse the repository at this point in the history
* Delegate workflow spec creation

* Use checked in common

* exclude sourcegrapht

* fix broken tests

* imports fix

* Parse workflow earlier and store spec type in the DB

* lint

* Remove unused variable

* Rename SdkWorkflowSpec to SDKSpec

---------

Co-authored-by: skudasov <[email protected]>
  • Loading branch information
nolag and skudasov authored Sep 17, 2024
1 parent bc177cb commit a7348f0
Show file tree
Hide file tree
Showing 29 changed files with 404 additions and 57 deletions.
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1083,8 +1083,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6 h1:cbHlV2CSphQ+ghDye21M8ym0aAO/Y649H2Mg60M2AuE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240916150615-85b8aa5fa7e6/go.mod h1:Lv77O13ZxOdmlvnu2vaUC0Lg+t3JAL+N+9K8dRsgmDI=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce h1:qXS0aWiDFDoLRCB+kSGnzp77iYT2luflUyzE5BnNmpY=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913161926-ce5d667907ce/go.mod h1:sjiiPwd4KsYOCf68MwL86EKphdXeT66EY7j53WH5DCc=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf h1:1AlTUkT5D8HmvU9bwDoIN54/EFyOnRBl7gnXZVrYXEA=
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240916150342-36cb47701edf/go.mod h1:l8NTByXUdGGJX+vyKYI6yX1/HIpM14F8Wm9BkU3Q4Qo=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
Expand Down
2 changes: 1 addition & 1 deletion core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowJobSpec(spec)
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
default:
Expand Down
8 changes: 7 additions & 1 deletion core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
spec: &job.WorkflowSpec{
ID: 1,
Workflow: pkgworkflows.WFYamlSpec(t, "workflow01", addr1),
SpecType: job.YamlSpec,
},
before: mustInsertWFJob,
},
Expand All @@ -1881,6 +1882,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
return mustInsertWFJob(t, o, &c)
},
},
Expand Down Expand Up @@ -1948,18 +1950,21 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand All @@ -1976,13 +1981,14 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
assert.EqualValues(t, j.WorkflowSpec.WorkflowID, s.WorkflowID)
assert.EqualValues(t, j.WorkflowSpec.WorkflowOwner, s.WorkflowOwner)
assert.EqualValues(t, j.WorkflowSpec.WorkflowName, s.WorkflowName)
assert.Equal(t, j.WorkflowSpec.SpecType, job.YamlSpec)
}
})
}

func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
t.Helper()
err := s.Validate()
err := s.Validate(testutils.Context(t))
require.NoError(t, err, "failed to validate spec %v", s)
ctx := testutils.Context(t)
_, err = toml.Marshal(s.Workflow)
Expand Down
48 changes: 40 additions & 8 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package job

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
Expand All @@ -14,9 +15,12 @@ import (
"github.com/pkg/errors"
"gopkg.in/guregu/null.v4"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand Down Expand Up @@ -857,16 +861,26 @@ type LiquidityBalancerSpec struct {
LiquidityBalancerConfig string `toml:"liquidityBalancerConfig" db:"liquidity_balancer_config"`
}

type WorkflowSpecType string

const (
YamlSpec WorkflowSpecType = "yaml"
DefaultSpecType = YamlSpec
)

type WorkflowSpec struct {
ID int32 `toml:"-"`
Workflow string `toml:"workflow"` // the yaml representation of the workflow
Workflow string `toml:"workflow"` // the raw representation of the workflow
Config string `toml:"config"` // the raw representation of the config
// fields derived from the yaml spec, used for indexing the database
// note: i tried to make these private, but translating them to the database seems to require them to be public
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `db:"spec_type"`
sdkWorkflow *sdk.WorkflowSpec
}

var (
Expand All @@ -879,14 +893,18 @@ const (
)

// Validate checks the workflow spec for correctness
func (w *WorkflowSpec) Validate() error {
func (w *WorkflowSpec) Validate(ctx context.Context) error {
s, err := pkgworkflows.ParseWorkflowSpecYaml(w.Workflow)
if err != nil {
return fmt.Errorf("%w: failed to parse workflow spec %s: %w", ErrInvalidWorkflowYAMLSpec, w.Workflow, err)
}

if _, err = w.SDKSpec(ctx); err != nil {
return err
}

w.WorkflowOwner = strings.TrimPrefix(s.Owner, "0x") // the json schema validation ensures it is a hex string with 0x prefix, but the database does not store the prefix
w.WorkflowName = s.Name
w.WorkflowID = s.CID()

if len(w.WorkflowID) != workflowIDLen {
return fmt.Errorf("%w: incorrect length for id %s: expected %d, got %d", ErrInvalidWorkflowID, w.WorkflowID, workflowIDLen, len(w.WorkflowID))
Expand All @@ -895,6 +913,20 @@ func (w *WorkflowSpec) Validate() error {
return nil
}

func (w *WorkflowSpec) SDKSpec(ctx context.Context) (sdk.WorkflowSpec, error) {
if w.sdkWorkflow != nil {
return *w.sdkWorkflow, nil
}

spec, cid, err := workflowSpecFactory.Spec(ctx, w.Workflow, []byte(w.Config), w.SpecType)
if err != nil {
return sdk.WorkflowSpec{}, err
}
w.sdkWorkflow = &spec
w.WorkflowID = cid
return spec, nil
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
Expand Down
4 changes: 3 additions & 1 deletion core/services/job/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/codec"
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -322,7 +324,7 @@ func TestWorkflowSpec_Validate(t *testing.T) {
w := &WorkflowSpec{
Workflow: tt.fields.Workflow,
}
err := w.Validate()
err := w.Validate(testutils.Context(t))
require.Equal(t, tt.wantError, err != nil)
if !tt.wantError {
assert.NotEmpty(t, w.WorkflowID)
Expand Down
6 changes: 3 additions & 3 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW())
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down Expand Up @@ -961,7 +961,7 @@ func (o *orm) FindJob(ctx context.Context, id int32) (jb Job, err error) {
return
}

// FindJobWithoutSpecErrors returns a job by ID, without loading Spec Errors preloaded
// FindJobWithoutSpecErrors returns a job by ID, without loading SpecVal Errors preloaded
func (o *orm) FindJobWithoutSpecErrors(ctx context.Context, id int32) (jb Job, err error) {
err = o.transact(ctx, true, func(tx *orm) error {
stmt := "SELECT jobs.*, job_pipeline_specs.pipeline_spec_id as pipeline_spec_id FROM jobs JOIN job_pipeline_specs ON (jobs.id = job_pipeline_specs.job_id) WHERE jobs.id = $1 LIMIT 1"
Expand Down
51 changes: 51 additions & 0 deletions core/services/job/workflow_spec_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package job

import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

var ErrInvalidWorkflowType = errors.New("invalid workflow type")

type SDKWorkflowSpecFactory interface {
Spec(ctx context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error)
RawSpec(ctx context.Context, wf string) ([]byte, error)
}

type WorkflowSpecFactory map[WorkflowSpecType]SDKWorkflowSpecFactory

func (wsf WorkflowSpecFactory) Spec(
ctx context.Context, workflow string, config []byte, tpe WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
if tpe == "" {
tpe = DefaultSpecType
}

factory, ok := wsf[tpe]
if !ok {
return sdk.WorkflowSpec{}, "", ErrInvalidWorkflowType
}

rawSpec, err := factory.RawSpec(ctx, workflow)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}

spec, err := factory.Spec(ctx, rawSpec, config)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}

sum := sha256.New()
sum.Write(rawSpec)
sum.Write(config)

return spec, fmt.Sprintf("%x", sum.Sum(nil)), nil
}

var workflowSpecFactory = WorkflowSpecFactory{
YamlSpec: YAMLSpecFactory{},
}
106 changes: 106 additions & 0 deletions core/services/job/workflow_spec_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package job_test

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
t.Parallel()

anyData := "any data"
anyConfig := []byte("any config")
anySpec := sdk.WorkflowSpec{Name: "name", Owner: "owner"}

t.Run("delegates to factory and calculates CID", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, job.YamlSpec)
})

t.Run("delegates default", func(t *testing.T) {
runYamlSpecTest(t, anySpec, anyData, anyConfig, "")
})

t.Run("CID without config matches", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, noConfig: true, SpecVal: anySpec},
}
results, cid, err := factory.Spec(testutils.Context(t), anyData, nil, job.YamlSpec)
require.NoError(t, err)

assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
})

t.Run("returns errors from sdk factory", func(t *testing.T) {
anyErr := errors.New("nope")
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, Err: anyErr},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, job.YamlSpec)
assert.Equal(t, anyErr, err)
})

t.Run("returns an error if the type is not supported", func(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, "unsupported")
assert.Error(t, err)
})
}

func runYamlSpecTest(t *testing.T, anySpec sdk.WorkflowSpec, anyData string, anyConfig []byte, specType job.WorkflowSpecType) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

results, cid, err := factory.Spec(testutils.Context(t), anyData, anyConfig, specType)

require.NoError(t, err)
assert.Equal(t, anySpec, results)

sha256Hash := sha256.New()
sha256Hash.Write([]byte(anyData))
sha256Hash.Write(anyConfig)
expectedCid := fmt.Sprintf("%x", sha256Hash.Sum(nil))
assert.Equal(t, expectedCid, cid)
}

type mockSdkSpecFactory struct {
t *testing.T
noConfig bool
SpecVal sdk.WorkflowSpec
Err error
}

func (f mockSdkSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}

func (f mockSdkSpecFactory) Spec(_ context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
assert.ElementsMatch(f.t, rawSpec, []byte("any data"))
if f.noConfig {
assert.Nil(f.t, config)
} else {
assert.ElementsMatch(f.t, config, []byte("any config"))
}

return f.SpecVal, f.Err
}
20 changes: 20 additions & 0 deletions core/services/job/yaml_spec_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package job

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

type YAMLSpecFactory struct{}

var _ SDKWorkflowSpecFactory = (*YAMLSpecFactory)(nil)

func (y YAMLSpecFactory) Spec(_ context.Context, rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
return workflows.ParseWorkflowSpecYaml(string(rawSpec))
}

func (y YAMLSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return []byte(wf), nil
}
Loading

0 comments on commit a7348f0

Please sign in to comment.