Skip to content

Commit

Permalink
feat(workflows): adds registry syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 18, 2024
1 parent c686783 commit 6611ffd
Show file tree
Hide file tree
Showing 18 changed files with 2,026 additions and 19 deletions.
15 changes: 15 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,21 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
workflowRegistrySyncer := syncer.NewWorkflowRegistry(nil, nil, nil, nil, "")
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package workflow_registry_syncer_test

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
"github.com/smartcontractkit/chainlink/v2/core/utils/signalers"

"github.com/stretchr/testify/require"
)

func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)

giveTicker = signalers.MakeTicker(ctx.Done(), 500*time.Millisecond)
giveSecretsURL = "https://original-url.com"
donID = uint32(1)
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(0),
SecretsURL: giveSecretsURL,
}
giveContents = "contents"
wantContents = "updated contents"
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.ContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

// fill ID with randomd data
var giveID [32]byte
rand.Read((giveID)[:])
giveWorkflow.ID = giveID

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
require.NoError(t, err)

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
contractName: {
ContractPollingFilter: evmtypes.ContractPollingFilter{
GenericEventNames: []string{forceUpdateSecretsEvent},
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
forceUpdateSecretsEvent: {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...))
require.NoError(t, err)
giveHash := hex.EncodeToString(hash)

gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents)
require.NoError(t, err)

gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID)
require.NoError(t, err)
require.Equal(t, giveSecretsURL, gotSecretsURL)

// verify the DB
contents, err := orm.GetContents(ctx, giveSecretsURL)
require.NoError(t, err)
require.Equal(t, contents, giveContents)

worker := syncer.NewWorkflowRegistry(
lggr,
orm,
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
syncer.WithTicker(giveTicker),
)

// generate a log event
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{1}, true)
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

servicetest.Run(t, worker)

go func() {
for {
<-time.After(time.Second)
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)
backendTH.Backend.Commit()
}
}()

// Require the secrets contents to eventually be updated
require.Eventually(t, func() bool {
secrets, err := orm.GetContents(ctx, giveSecretsURL)
lggr.Debugf("got secrets %v", secrets)
require.NoError(t, err)
return secrets == wantContents
}, 5*time.Second, time.Second)
}

func updateAuthorizedAddress(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
addresses []common.Address,
allowed bool,
) {
t.Helper()
_, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, allowed)
require.NoError(t, err, "failed to update authorised addresses")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func updateAllowedDONs(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
donIDs []uint32,
allowed bool,
) {
t.Helper()
_, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed)
require.NoError(t, err, "failed to update DONs")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

type RegisterWorkflowCMD struct {
Name string
ID [32]byte
DonID uint32
Status uint8
BinaryURL string
ConfigURL string
SecretsURL string
}

func registerWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
input RegisterWorkflowCMD,
) {
t.Helper()
_, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID,
input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL)
require.NoError(t, err, "failed to register workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func requestForceUpdateSecrets(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
secretsURL string,
) {
_, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL)
require.NoError(t, err)
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func miner(t *testing.T, tb *testutils.EVMBackendTH, d time.Duration) {
t.Helper()

ctx := coretestutils.Context(t)
for {
select {
case <-ctx.Done():
return
case <-time.NewTicker(d).C:
tb.Backend.Commit()
}
}
}
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -849,7 +849,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1605,7 +1605,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
Loading

0 comments on commit 6611ffd

Please sign in to comment.