Skip to content

Commit

Permalink
feat(workflows): adds registry syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Nov 17, 2024
1 parent c686783 commit bde0f2c
Show file tree
Hide file tree
Showing 14 changed files with 1,575 additions and 19 deletions.
15 changes: 15 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,21 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
workflowRegistrySyncer := syncer.NewWorkflowRegistry(nil, nil, nil, nil, "")
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -849,7 +849,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1605,7 +1605,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
148 changes: 148 additions & 0 deletions core/services/workflows/syncer/contract_reader_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 72 additions & 0 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package syncer

import (
"context"
"encoding/hex"
"fmt"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
type eventHandler struct {
lggr logger.Logger
orm ORM
fetcher FetcherFunc
}

// newEventHandler returns a new eventHandler instance.
func newEventHandler(
lggr logger.Logger,
orm ORM,
gateway FetcherFunc,
) *eventHandler {
return &eventHandler{
lggr: lggr,
orm: orm,
fetcher: gateway,
}
}

func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error {
switch event.EventType {
case ForceUpdateSecretsEvent:
return h.forceUpdateSecretsEvent(ctx, event)
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
func (h *eventHandler) forceUpdateSecretsEvent(
ctx context.Context,
event WorkflowRegistryEvent,
) error {
// Get the URL of the secrets file from the event data
data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

hash := hex.EncodeToString(data.SecretsURLHash)

url, err := h.orm.GetSecretsURLByHash(ctx, hash)
if err != nil {
h.lggr.Errorf("failed to get URL by hash %s : %w", hash, err)
return err
}

// Fetch the contents of the secrets file from the url via the fetcher
secrets, err := h.fetcher(ctx, url)
if err != nil {
return err
}

// Update the secrets in the ORM
if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil {
return err
}

return nil
}
63 changes: 63 additions & 0 deletions core/services/workflows/syncer/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package syncer

import "container/heap"

type Heap interface {
// Push adds a new item to the heap.
Push(x WorkflowRegistryEventResponse)

// Pop removes the smallest item from the heap and returns it.
Pop() WorkflowRegistryEventResponse

// Len returns the number of items in the heap.
Len() int
}

// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods.
type publicHeap[T any] struct {
heap heap.Interface
}

func (h *publicHeap[T]) Push(x T) {
heap.Push(h.heap, x)
}

func (h *publicHeap[T]) Pop() T {
return heap.Pop(h.heap).(T)
}

func (h *publicHeap[T]) Len() int {
return h.heap.Len()
}

// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height.
type blockHeightHeap []WorkflowRegistryEventResponse

// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height.
func newBlockHeightHeap() Heap {
h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0))
heap.Init(&h)
return &publicHeap[WorkflowRegistryEventResponse]{heap: &h}
}

func (h *blockHeightHeap) Len() int { return len(*h) }

func (h *blockHeightHeap) Less(i, j int) bool {
return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height
}

func (h *blockHeightHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}

func (h *blockHeightHeap) Push(x any) {
*h = append(*h, x.(WorkflowRegistryEventResponse))
}

func (h *blockHeightHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
Loading

0 comments on commit bde0f2c

Please sign in to comment.