From 12b8a48db221104517fb6d26faa64ba648f9c8d3 Mon Sep 17 00:00:00 2001 From: Cedric Date: Fri, 15 Mar 2024 11:44:53 +0000 Subject: [PATCH 1/6] [KS-86] Add handler to execute single step (#12394) * Add hardcoded YAML workflow and parsing * Execute an individual step * Execute an individual step --- core/services/workflows/engine.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 3261bdd3fce..3ee3c0ad4ad 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -273,21 +273,21 @@ func (e *Engine) Close() error { func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine *Engine, err error) { yamlWorkflowSpec := ` -triggers: - - type: "on_mercury_report" - ref: report_data - config: - feedlist: - - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD - - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD - - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD - -consensus: - - type: "offchain_reporting" - ref: evm_median - inputs: - observations: - - $(report_data.outputs) + triggers: + - type: "on_mercury_report" + ref: report_data + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + + consensus: + - type: "offchain_reporting" + ref: evm_median + inputs: + observations: + - $(report_data.outputs) config: aggregation_method: data_feeds_2_0 aggregation_config: From 921cc71b7d78bc014baad1876d47e1fb41f98c8d Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Tue, 12 Mar 2024 13:41:52 +0000 Subject: [PATCH 2/6] Execute an individual step --- core/services/workflows/engine.go | 30 +++++++++++++++--------------- core/services/workflows/state.go | 1 - 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 3ee3c0ad4ad..3261bdd3fce 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -273,21 +273,21 @@ func (e *Engine) Close() error { func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine *Engine, err error) { yamlWorkflowSpec := ` - triggers: - - type: "on_mercury_report" - ref: report_data - config: - feedlist: - - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD - - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD - - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD - - consensus: - - type: "offchain_reporting" - ref: evm_median - inputs: - observations: - - $(report_data.outputs) +triggers: + - type: "on_mercury_report" + ref: report_data + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +consensus: + - type: "offchain_reporting" + ref: evm_median + inputs: + observations: + - $(report_data.outputs) config: aggregation_method: data_feeds_2_0 aggregation_config: diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index e002fa90501..95b488db5b1 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -84,7 +84,6 @@ func interpolateKey(key string, state *executionState) (any, error) { if d < 0 { return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index %d must be a positive number", r, v, d) } - val = v[d] default: return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`", r, val) From 6dafba136311e9cdaf0ee406a43c15aeecc0691f Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Thu, 14 Mar 2024 11:25:13 +0000 Subject: [PATCH 3/6] [KS-87] Add scheduler to workflow engine --- core/services/workflows/delegate.go | 57 ++- core/services/workflows/engine.go | 508 +++++++++++++++-------- core/services/workflows/engine_test.go | 7 +- core/services/workflows/graph.go | 62 +++ core/services/workflows/graph_test.go | 49 +++ core/services/workflows/queue.go | 53 +++ core/services/workflows/queue_test.go | 31 ++ core/services/workflows/state.go | 105 ++++- core/services/workflows/state_test.go | 32 +- core/services/workflows/store.go | 52 +++ core/services/workflows/workflow.go | 129 +++++- core/services/workflows/workflow_test.go | 191 +++++++++ 12 files changed, 1062 insertions(+), 214 deletions(-) create mode 100644 core/services/workflows/graph.go create mode 100644 core/services/workflows/graph_test.go create mode 100644 core/services/workflows/queue.go create mode 100644 core/services/workflows/queue_test.go create mode 100644 core/services/workflows/store.go create mode 100644 core/services/workflows/workflow_test.go diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index a54a33e9f0d..c1bff37a33f 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -15,6 +15,56 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) +const hardcodedWorkflow = ` +triggers: + - type: "on_mercury_report" + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +consensus: + - type: "offchain_reporting" + ref: evm_median + inputs: + observations: + - $(trigger.outputs) + config: + aggregation_method: data_feeds_2_0 + aggregation_config: + 0x1111111111111111111100000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + 0x2222222222222222222200000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + 0x3333333333333333333300000000000000000000000000000000000000000000: + deviation: "0.001" + heartbeat: "30m" + encoder: EVM + encoder_config: + abi: "mercury_reports bytes[]" + +targets: + - type: write_polygon-testnet-mumbai + inputs: + report: + - $(evm_median.outputs.reports) + config: + address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" + params: [($inputs.report)] + abi: "receive(report bytes)" + - type: write_ethereum-testnet-sepolia + inputs: + report: + - $(evm_median.outputs.reports) + config: + address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" + params: ["$(inputs.report)"] + abi: "receive(report bytes)" +` + type Delegate struct { registry types.CapabilitiesRegistry logger logger.Logger @@ -36,7 +86,12 @@ func (d *Delegate) OnDeleteJob(ctx context.Context, jb job.Job, q pg.Queryer) er // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { - engine, err := NewEngine(d.logger, d.registry) + cfg := Config{ + Lggr: d.logger, + Spec: hardcodedWorkflow, + Registry: d.registry, + } + engine, err := NewEngine(cfg) if err != nil { return nil, err } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 3261bdd3fce..fa3017db6ad 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -3,8 +3,11 @@ package workflows import ( "context" "fmt" + "sync" "time" + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -21,98 +24,126 @@ const ( type Engine struct { services.StateMachine - logger logger.Logger - registry types.CapabilitiesRegistry - trigger capabilities.TriggerCapability - consensus capabilities.ConsensusCapability - targets []capabilities.TargetCapability - workflow *Workflow - callbackCh chan capabilities.CapabilityResponse - cancel func() + logger logger.Logger + registry types.CapabilitiesRegistry + workflow *workflow + store *store + queue *queue[stepRequest] + callbackCh chan capabilities.CapabilityResponse + newWorkerCh chan struct{} + stepUpdateCh chan stepState + wg sync.WaitGroup + stopCh services.StopChan } func (e *Engine) Start(ctx context.Context) error { return e.StartOnce("Engine", func() error { // create a new context, since the one passed in via Start is short-lived. - ctx, cancel := context.WithCancel(context.Background()) - e.cancel = cancel + ctx, _ := e.stopCh.NewCtx() + + // queue.start will add to the wg and + // spin off a goroutine. + e.queue.start(ctx, &e.wg) + + e.wg.Add(2) go e.init(ctx) - go e.triggerHandlerLoop(ctx) + go e.loop(ctx) + return nil }) } func (e *Engine) init(ctx context.Context) { + defer e.wg.Done() + retrySec := 5 ticker := time.NewTicker(time.Duration(retrySec) * time.Second) defer ticker.Stop() - // Note: in our hardcoded workflow, there is only one trigger, - // and one consensus step. - trigger := e.workflow.Triggers[0] - consensus := e.workflow.Consensus[0] - - var err error + initSuccessful := true LOOP: for { select { case <-ctx.Done(): return case <-ticker.C: - e.trigger, err = e.registry.GetTrigger(ctx, trigger.Type) - if err != nil { - e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec) - break + for _, t := range e.workflow.triggers { + cp, err := e.registry.GetTrigger(ctx, t.Type) + if err != nil { + initSuccessful = false + e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec) + } else { + t.cachedTrigger = cp + } } - e.consensus, err = e.registry.GetConsensus(ctx, consensus.Type) - if err != nil { - e.logger.Errorf("failed to get consensus capability: %s, retrying in %d seconds", err, retrySec) - break - } - failed := false - e.targets = make([]capabilities.TargetCapability, len(e.workflow.Targets)) - for i, target := range e.workflow.Targets { - e.targets[i], err = e.registry.GetTarget(ctx, target.Type) - if err != nil { - e.logger.Errorf("failed to get target capability: %s, retrying in %d seconds", err, retrySec) - failed = true - break + err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + // The graph contains a dummy node for triggers, but + // we handle triggers separately since there might be more than one. + if n.Ref == keywordTrigger { + return nil + } + + if n.cachedCapability != nil { + return nil + } + + cp, innerErr := e.registry.Get(ctx, n.Type) + if innerErr != nil { + return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", n.Type, innerErr, retrySec) + } + + cc, ok := cp.(capabilities.CallbackExecutable) + if !ok { + return fmt.Errorf("could not coerce capability %s to CallbackExecutable", n.Type) + } + + if n.cachedConfig == nil { + configMap, ierr := values.NewMap(n.Config) + if innerErr != nil { + return fmt.Errorf("failed to convert config to values.Map: %s", ierr) + } + n.cachedConfig = configMap + } + + reg := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: mockedWorkflowID, + }, + Config: n.cachedConfig, } + + innerErr = cc.RegisterToWorkflow(ctx, reg) + if innerErr != nil { + return fmt.Errorf("failed to register to workflow: %+v", reg) + } + + n.cachedCapability = cc + return nil + }) + if err != nil { + initSuccessful = false + e.logger.Error(err) } - if !failed { + + if initSuccessful { break LOOP } } } - // we have all needed capabilities, now we can register for trigger events - err = e.registerTrigger(ctx) - if err != nil { - e.logger.Errorf("failed to register trigger: %s", err) - } - - // also register for consensus - cm, err := values.NewMap(consensus.Config) - if err != nil { - e.logger.Errorf("failed to convert config to values.Map: %s", err) - } - reg := capabilities.RegisterToWorkflowRequest{ - Metadata: capabilities.RegistrationMetadata{ - WorkflowID: mockedWorkflowID, - }, - Config: cm, - } - err = e.consensus.RegisterToWorkflow(ctx, reg) - if err != nil { - e.logger.Errorf("failed to register consensus: %s", err) + // We have all needed capabilities, now we can register for trigger events + for _, t := range e.workflow.triggers { + err := e.registerTrigger(ctx, t) + if err != nil { + e.logger.Errorf("failed to register trigger: %s", err) + } } e.logger.Info("engine initialized") } -func (e *Engine) registerTrigger(ctx context.Context) error { - trigger := e.workflow.Triggers[0] +func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( map[string]any{ "triggerId": mockedTriggerID, @@ -122,7 +153,7 @@ func (e *Engine) registerTrigger(ctx context.Context) error { return err } - tc, err := values.NewMap(trigger.Config) + tc, err := values.NewMap(t.Config) if err != nil { return err } @@ -134,204 +165,323 @@ func (e *Engine) registerTrigger(ctx context.Context) error { Config: tc, Inputs: triggerInputs, } - err = e.trigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) + err = t.cachedTrigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) if err != nil { - return fmt.Errorf("failed to instantiate mercury_trigger, %s", err) + return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err) } return nil } -func (e *Engine) triggerHandlerLoop(ctx context.Context) { +// loop is the synchronization goroutine for the engine, and is responsible for: +// - dispatching new workers up to the limit specified (default = 100) +// - starting a new execution when a trigger emits a message on `callbackCh` +// - updating the `executionState` with the outcome of a `step`. +// +// Note: `executionState` is only mutated by this loop directly. +// This is important to avoid data races, and any accesses of `executionState` by any other +// goroutine should happen via a `stepRequest` message containing a copy of the latest +// `executionState`. This works because a worker thread for a given step will only +// be spun up once all dependent steps have completed (guaranteeing that the state associated +// with those dependent steps will no longer change). Therefore as long this worker thread only +// accesses data from dependent states, the data will never be stale. +func (e *Engine) loop(ctx context.Context) { + defer e.wg.Done() for { select { case <-ctx.Done(): return case resp := <-e.callbackCh: - go e.handleExecution(ctx, resp) + if resp.Err != nil { + e.logger.Errorf("trigger event was an error; not executing", resp.Err) + } else { + e.startExecution(ctx, resp.Value) + } + case dm := <-e.queue.out: + <-e.newWorkerCh + e.wg.Add(1) + go e.workerForStep(ctx, dm) + case stepUpdate := <-e.stepUpdateCh: + // Executed synchronously to ensure we correctly schedule subsequent tasks. + err := e.handleStepUpdate(ctx, stepUpdate) + if err != nil { + e.logger.Errorf("failed to update step state: %+v, %s", stepUpdate, err) + } } } } -func (e *Engine) handleExecution(ctx context.Context, event capabilities.CapabilityResponse) { - e.logger.Debugw("executing on a trigger event", "event", event) - trigger := e.workflow.Triggers[0] - if event.Err != nil { - e.logger.Errorf("trigger event was an error; not executing", event.Err) - return - } - +func (e *Engine) startExecution(ctx context.Context, event values.Value) { + executionID := uuid.New().String() + e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID) ec := &executionState{ steps: map[string]*stepState{ - trigger.Ref: { + keywordTrigger: { outputs: &stepOutput{ - value: event.Value, + value: event, }, + status: statusCompleted, }, }, workflowID: mockedWorkflowID, - executionID: mockedExecutionID, + executionID: executionID, + status: statusStarted, } - consensus := e.workflow.Consensus[0] - err := e.handleStep(ctx, ec, consensus) + err := e.store.add(ctx, ec) if err != nil { - e.logger.Errorf("error in handleConsensus %v", err) return } - for _, trg := range e.workflow.Targets { - err := e.handleStep(ctx, ec, trg) + // Find the tasks we need to fire when a trigger has fired and enqueue them. + for _, node := range e.workflow.adjacentNodes(keywordTrigger) { + e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID) + e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref} + } +} + +func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) error { + state, err := e.store.updateStep(ctx, &stepUpdate) + if err != nil { + return err + } + + switch stepUpdate.status { + case statusCompleted: + adjacentNodes := e.workflow.adjacentNodes(stepUpdate.ref) + // There are no nodes left to process in the current path, so let's check if + // we've completed the workflow. + // If not, we'll check adjacent nodes for any that are ready to process. + if len(adjacentNodes) == 0 { + workflowCompleted := true + err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + step, ok := state.steps[n.Ref] + if !ok { + workflowCompleted = false + return nil + } + + switch step.status { + case statusCompleted, statusErrored: + default: + workflowCompleted = false + } + return nil + }) + if err != nil { + return err + } + + if workflowCompleted { + err := e.store.updateStatus(ctx, state.executionID, statusCompleted) + if err != nil { + return err + } + } + } + + for _, node := range adjacentNodes { + var anyNotCompleted bool + for _, dr := range node.dependencies { + step, ok := state.steps[dr] + if !ok { + return fmt.Errorf("could not locate dependency %s in %+v", dr, state) + } + + if step.status != statusCompleted { + anyNotCompleted = true + } + } + + if !anyNotCompleted { + e.queue.in <- stepRequest{ + state: copyState(state), + stepRef: node.Ref, + } + } + } + case statusErrored: + err := e.store.updateStatus(ctx, state.executionID, statusErrored) if err != nil { - e.logger.Errorf("error in handleTargets %v", err) - return + return err } } + + return nil } -func (e *Engine) handleStep(ctx context.Context, es *executionState, node Capability) error { +func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { + defer e.wg.Done() + + e.logger.Debugw("executing on a step event", "event", msg, "executionID", msg.state.executionID) stepState := &stepState{ - outputs: &stepOutput{}, + outputs: &stepOutput{}, + executionID: msg.state.executionID, + ref: msg.stepRef, } - es.steps[node.Ref] = stepState - // Let's get the capability. If we fail here, we'll bail out - // and try to handle it at the next execution. - cp, err := e.registry.Get(ctx, node.Type) + inputs, outputs, err := e.handleStep(ctx, msg) if err != nil { - return err + e.logger.Errorf("error executing step request: %w", err, "executionID", msg.state.executionID, "stepRef", msg.stepRef) + stepState.outputs.err = err + stepState.status = statusErrored + } else { + stepState.outputs.value = outputs + stepState.status = statusCompleted + e.logger.Debugw("step executed successfully", "executionID", msg.state.executionID, "stepRef", msg.stepRef, "outputs", outputs) } - api, ok := cp.(capabilities.CallbackExecutable) + stepState.inputs = inputs + + e.stepUpdateCh <- *stepState + e.newWorkerCh <- struct{}{} +} + +func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { + node, ok := e.workflow.nodes[msg.stepRef] if !ok { - return fmt.Errorf("capability %s must be an action, consensus or target", node.Type) + return nil, nil, fmt.Errorf("could not get step for %s", msg.stepRef) } - i, err := findAndInterpolateAllKeys(node.Inputs, es) + i, err := findAndInterpolateAllKeys(node.Inputs, msg.state) if err != nil { - return err + return nil, nil, err } inputs, err := values.NewMap(i.(map[string]any)) if err != nil { - return err - } - - stepState.inputs = inputs - - config, err := values.NewMap(node.Config) - if err != nil { - return err + return nil, nil, err } tr := capabilities.CapabilityRequest{ Inputs: inputs, - Config: config, + Config: node.cachedConfig, Metadata: capabilities.RequestMetadata{ - WorkflowID: es.workflowID, - WorkflowExecutionID: es.executionID, + WorkflowID: msg.state.workflowID, + WorkflowExecutionID: msg.state.executionID, }, } - resp, err := capabilities.ExecuteSync(ctx, api, tr) + resp, err := capabilities.ExecuteSync(ctx, node.cachedCapability, tr) if err != nil { - stepState.outputs.err = err - return err + return inputs, nil, err } // `ExecuteSync` returns a `values.List` even if there was // just one return value. If that is the case, let's unwrap the // single value to make it easier to use in -- for example -- variable interpolation. if len(resp.Underlying) > 1 { - stepState.outputs.value = resp - } else { - stepState.outputs.value = resp.Underlying[0] + return inputs, resp, err } - return nil + return inputs, resp.Underlying[0], err +} + +func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) error { + triggerInputs, err := values.NewMap( + map[string]any{ + "triggerId": mockedTriggerID, + }, + ) + if err != nil { + return err + } + deregRequest := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: mockedWorkflowID, + }, + Inputs: triggerInputs, + } + return t.cachedTrigger.UnregisterTrigger(context.Background(), deregRequest) } func (e *Engine) Close() error { return e.StopOnce("Engine", func() error { - defer e.cancel() + ctx := context.Background() + // To shut down the engine, we'll start by deregistering + // any triggers to ensure no new executions are triggered, + // then we'll close down any background goroutines, + // and finally, we'll deregister any workflow steps. + for _, t := range e.workflow.triggers { + err := e.deregisterTrigger(ctx, t) + if err != nil { + return err + } + } - triggerInputs, err := values.NewMap( - map[string]any{ - "triggerId": mockedTriggerID, - }, - ) + close(e.stopCh) + e.wg.Wait() + + err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + if n.Ref == keywordTrigger { + return nil + } + + reg := capabilities.UnregisterFromWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: mockedWorkflowID, + }, + Config: n.cachedConfig, + } + + innerErr := n.cachedCapability.UnregisterFromWorkflow(ctx, reg) + if innerErr != nil { + return fmt.Errorf("failed to unregister from workflow: %+v", reg) + } + + return nil + }) if err != nil { return err } - deregRequest := capabilities.CapabilityRequest{ - Metadata: capabilities.RequestMetadata{ - WorkflowID: mockedWorkflowID, - }, - Inputs: triggerInputs, - } - return e.trigger.UnregisterTrigger(context.Background(), deregRequest) + + return nil }) } -func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (engine *Engine, err error) { - yamlWorkflowSpec := ` -triggers: - - type: "on_mercury_report" - ref: report_data - config: - feedlist: - - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD - - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD - - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD - -consensus: - - type: "offchain_reporting" - ref: evm_median - inputs: - observations: - - $(report_data.outputs) - config: - aggregation_method: data_feeds_2_0 - aggregation_config: - 0x1111111111111111111100000000000000000000000000000000000000000000: - deviation: "0.001" - heartbeat: "30m" - 0x2222222222222222222200000000000000000000000000000000000000000000: - deviation: "0.001" - heartbeat: "30m" - 0x3333333333333333333300000000000000000000000000000000000000000000: - deviation: "0.001" - heartbeat: "30m" - encoder: EVM - encoder_config: - abi: "mercury_reports bytes[]" - -targets: - - type: write_polygon-testnet-mumbai - inputs: - report: - - $(evm_median.outputs.reports) - config: - address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" - params: [($inputs.report)] - abi: "receive(report bytes)" - - type: write_ethereum-testnet-sepolia - inputs: - report: - - $(evm_median.outputs.reports) - config: - address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" - params: ["$(inputs.report)"] - abi: "receive(report bytes)" -` - - workflow, err := Parse(yamlWorkflowSpec) +type Config struct { + Spec string + Lggr logger.Logger + Registry types.CapabilitiesRegistry + MaxWorkerLimit int +} + +const ( + defaultWorkerLimit = 100 +) + +func NewEngine(cfg Config) (engine *Engine, err error) { + if cfg.MaxWorkerLimit == 0 { + cfg.MaxWorkerLimit = defaultWorkerLimit + } + // TODO: validation of the workflow spec + // We'll need to check, among other things: + // - that there are no node `ref` called `trigger` as this is reserved for any triggers + // - that there are no duplicate `ref`s + // - that the `ref` for any triggers is empty -- and filled in with `trigger` + // - etc. + + workflow, err := Parse(cfg.Spec) if err != nil { return nil, err } + + // Instantiate semaphore to put a limit on the number of workers + newWorkerCh := make(chan struct{}, cfg.MaxWorkerLimit) + for i := 0; i < cfg.MaxWorkerLimit; i++ { + newWorkerCh <- struct{}{} + } + + var wg sync.WaitGroup engine = &Engine{ - logger: lggr.Named("WorkflowEngine"), - registry: registry, - workflow: workflow, - callbackCh: make(chan capabilities.CapabilityResponse), + logger: cfg.Lggr.Named("WorkflowEngine"), + registry: cfg.Registry, + workflow: workflow, + store: newStore(), + queue: newQueue[stepRequest](), + newWorkerCh: newWorkerCh, + stepUpdateCh: make(chan stepState), + callbackCh: make(chan capabilities.CapabilityResponse), + wg: wg, + stopCh: make(chan struct{}), } return engine, nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 74a2093c0d2..9e7f6a7f80f 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -138,7 +138,12 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { require.NoError(t, reg.Add(ctx, target2)) lggr := logger.TestLogger(t) - eng, err := NewEngine(lggr, reg) + cfg := Config{ + Lggr: lggr, + Registry: reg, + Spec: hardcodedWorkflow, + } + eng, err := NewEngine(cfg) require.NoError(t, err) resp, err := values.NewMap(map[string]any{ diff --git a/core/services/workflows/graph.go b/core/services/workflows/graph.go new file mode 100644 index 00000000000..36f0b0e0966 --- /dev/null +++ b/core/services/workflows/graph.go @@ -0,0 +1,62 @@ +package workflows + +import "fmt" + +type graph[T any] struct { + // ref -> refs + adjacencies map[string]map[string]struct{} + // ref -> nodes + nodes map[string]T +} + +func (g *graph[T]) walkDo(startingRef string, f func(n T) error) error { + nodesToVisit := []string{startingRef} + for adj := range g.adjacencies[startingRef] { + nodesToVisit = append(nodesToVisit, adj) + } + + visited := map[string]struct{}{} + for { + if len(nodesToVisit) == 0 { + return nil + } + + curr := nodesToVisit[0] + nodesToVisit = nodesToVisit[1:] + if _, found := visited[curr]; found { + continue + } + + n, ok := g.nodes[curr] + if !ok { + return fmt.Errorf("could not find node with ref %s", curr) + } + visited[curr] = struct{}{} + + for adj := range g.adjacencies[curr] { + nodesToVisit = append(nodesToVisit, adj) + } + + err := f(n) + if err != nil { + return err + } + } +} + +func (g *graph[T]) adjacentNodes(ref string) []T { + refs, ok := g.adjacencies[ref] + if !ok { + return []T{} + } + + nodes := []T{} + for adjacent := range refs { + n, ok := g.nodes[adjacent] + if ok { + nodes = append(nodes, n) + } + } + + return nodes +} diff --git a/core/services/workflows/graph_test.go b/core/services/workflows/graph_test.go new file mode 100644 index 00000000000..d39e9575727 --- /dev/null +++ b/core/services/workflows/graph_test.go @@ -0,0 +1,49 @@ +package workflows + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGraph(t *testing.T) { + g := &graph[int]{ + adjacencies: map[string]map[string]struct{}{ + "node1": { + "node2": struct{}{}, + "node3": struct{}{}, + }, + "node2": { + "node3": struct{}{}, + }, + "node3": { + "node4": struct{}{}, + }, + "node4": { + "node5": struct{}{}, + }, + }, + nodes: map[string]int{ + "node1": 1, + "node2": 2, + "node3": 3, + "node4": 4, + "node5": 5, + }, + } + + inOrder := []int{} + err := g.walkDo("node1", func(n int) error { + inOrder = append(inOrder, n) + return nil + }) + require.NoError(t, err) + expected := []int{ + 1, 2, 3, 4, 5, + } + assert.ElementsMatch(t, expected, inOrder) + + got := g.adjacentNodes("node1") + assert.Equal(t, []int{2, 3}, got) +} diff --git a/core/services/workflows/queue.go b/core/services/workflows/queue.go new file mode 100644 index 00000000000..5ae9d4e4424 --- /dev/null +++ b/core/services/workflows/queue.go @@ -0,0 +1,53 @@ +package workflows + +import ( + "context" + "sync" +) + +type stepRequest struct { + stepRef string + state executionState +} + +type queue[T any] struct { + in chan T + out chan T +} + +func (q *queue[T]) worker(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + qData := []T{} + + for { + select { + case <-ctx.Done(): + return + case inc := <-q.in: + qData = append(qData, inc) + default: + if len(qData) > 0 { + popped := qData[0] + select { + case q.out <- popped: + qData = qData[1:] + default: + } + } + } + + } +} + +func (q *queue[T]) start(ctx context.Context, wg *sync.WaitGroup) { + wg.Add(1) + go q.worker(ctx, wg) +} + +func newQueue[T any]() *queue[T] { + return &queue[T]{ + in: make(chan T), + out: make(chan T), + } +} diff --git a/core/services/workflows/queue_test.go b/core/services/workflows/queue_test.go new file mode 100644 index 00000000000..d115cd69a4e --- /dev/null +++ b/core/services/workflows/queue_test.go @@ -0,0 +1,31 @@ +package workflows + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +func TestQueue(t *testing.T) { + ctx := testutils.Context(t) + q := newQueue[int]() + var wg sync.WaitGroup + q.start(ctx, &wg) + + ints := []int{1, 2, 3, 4, 5} + for _, i := range ints { + q.in <- i + } + + got := []int{} + for i := 0; i < 5; i++ { + got = append(got, <-q.out) + } + + assert.Equal(t, ints, got) + + assert.Len(t, q.out, 0) +} diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index 95b488db5b1..62eec51a835 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -9,12 +9,23 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" ) +const ( + statusStarted = "started" + statusErrored = "errored" + statusTimeout = "timeout" + statusCompleted = "completed" +) + type stepOutput struct { err error value values.Value } type stepState struct { + executionID string + ref string + status string + inputs *values.Map outputs *stepOutput } @@ -23,6 +34,44 @@ type executionState struct { steps map[string]*stepState executionID string workflowID string + + status string +} + +func copyState(es executionState) executionState { + steps := map[string]*stepState{} + for ref, step := range es.steps { + var mval *values.Map + if step.inputs != nil { + mp := values.Proto(step.inputs).GetMapValue() + copied := values.FromMapValueProto(mp) + mval = copied + } + + op := values.Proto(step.outputs.value) + copiedov := values.FromProto(op) + + newState := &stepState{ + executionID: step.executionID, + ref: step.ref, + status: step.status, + + outputs: &stepOutput{ + err: step.outputs.err, + value: copiedov, + }, + + inputs: mval, + } + + steps[ref] = newState + } + return executionState{ + executionID: es.executionID, + workflowID: es.workflowID, + status: es.status, + steps: steps, + } } // interpolateKey takes a multi-part, dot-separated key and attempts to replace @@ -30,7 +79,7 @@ type executionState struct { // A key is valid if: // - it contains at least two parts, with the first part being the workflow step's `ref` variable, and the second being one of `inputs` or `outputs` // - any subsequent parts will be processed as a list index (if the current element is a list) or a map key (if it's a map) -func interpolateKey(key string, state *executionState) (any, error) { +func interpolateKey(key string, state executionState) (any, error) { parts := strings.Split(key, ".") if len(parts) < 2 { @@ -100,16 +149,48 @@ var ( // findAndInterpolateAllKeys takes an `input` any value, and recursively // identifies any values that should be replaced from `state`. // A value `v` should be replaced if it is wrapped as follows `$(v)`. -func findAndInterpolateAllKeys(input any, state *executionState) (any, error) { +func findAndInterpolateAllKeys(input any, state executionState) (any, error) { + return traverse( + input, + func(el string) (any, error) { + matches := interpolationTokenRe.FindStringSubmatch(el) + if len(matches) < 2 { + return el, nil + } + + interpolatedVar := matches[1] + return interpolateKey(interpolatedVar, state) + }, + ) +} + +func findRefs(inputs map[string]any) ([]string, error) { + refs := []string{} + _, err := traverse( + inputs, + func(el string) (any, error) { + matches := interpolationTokenRe.FindStringSubmatch(el) + if len(matches) < 2 { + return el, nil + } + + m := matches[1] + parts := strings.Split(m, ".") + if len(parts) < 1 { + return nil, fmt.Errorf("invalid ref %s", m) + } + + refs = append(refs, parts[0]) + return el, nil + }, + ) + return refs, err +} + +func traverse(input any, do func(el string) (any, error)) (any, error) { switch tv := input.(type) { case string: - matches := interpolationTokenRe.FindStringSubmatch(tv) - if len(matches) < 2 { - return tv, nil - } - - interpolatedVar := matches[1] - nv, err := interpolateKey(interpolatedVar, state) + nv, err := do(tv) if err != nil { return nil, err } @@ -118,7 +199,7 @@ func findAndInterpolateAllKeys(input any, state *executionState) (any, error) { case map[string]any: nm := map[string]any{} for k, v := range tv { - nv, err := findAndInterpolateAllKeys(v, state) + nv, err := traverse(v, do) if err != nil { return nil, err } @@ -129,7 +210,7 @@ func findAndInterpolateAllKeys(input any, state *executionState) (any, error) { case []any: a := []any{} for _, el := range tv { - ne, err := findAndInterpolateAllKeys(el, state) + ne, err := traverse(el, do) if err != nil { return nil, err } @@ -139,5 +220,5 @@ func findAndInterpolateAllKeys(input any, state *executionState) (any, error) { return a, nil } - return nil, fmt.Errorf("cannot interpolate item %+v of type %T", input, input) + return nil, fmt.Errorf("cannot traverse item %+v of type %T", input, input) } diff --git a/core/services/workflows/state_test.go b/core/services/workflows/state_test.go index 9a0fadd02bd..1cde6a5b78a 100644 --- a/core/services/workflows/state_test.go +++ b/core/services/workflows/state_test.go @@ -26,14 +26,14 @@ func TestInterpolateKey(t *testing.T) { testCases := []struct { name string key string - state *executionState + state executionState expected any errMsg string }{ { name: "digging into a string", key: "evm_median.outputs.reports", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -47,7 +47,7 @@ func TestInterpolateKey(t *testing.T) { { name: "ref doesn't exist", key: "evm_median.outputs.reports", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{}, }, errMsg: "could not find ref `evm_median`", @@ -55,7 +55,7 @@ func TestInterpolateKey(t *testing.T) { { name: "less than 2 parts", key: "evm_median", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{}, }, errMsg: "must have at least two parts", @@ -63,7 +63,7 @@ func TestInterpolateKey(t *testing.T) { { name: "second part isn't `inputs` or `outputs`", key: "evm_median.foo", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -77,7 +77,7 @@ func TestInterpolateKey(t *testing.T) { { name: "outputs has errored", key: "evm_median.outputs", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -91,7 +91,7 @@ func TestInterpolateKey(t *testing.T) { { name: "digging into a recursive map", key: "evm_median.outputs.reports.inner", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -105,7 +105,7 @@ func TestInterpolateKey(t *testing.T) { { name: "missing key in map", key: "evm_median.outputs.reports.missing", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -119,7 +119,7 @@ func TestInterpolateKey(t *testing.T) { { name: "digging into an array", key: "evm_median.outputs.reportsList.0", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -133,7 +133,7 @@ func TestInterpolateKey(t *testing.T) { { name: "digging into an array that's too small", key: "evm_median.outputs.reportsList.2", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -147,7 +147,7 @@ func TestInterpolateKey(t *testing.T) { { name: "digging into an array with a string key", key: "evm_median.outputs.reportsList.notAString", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -161,7 +161,7 @@ func TestInterpolateKey(t *testing.T) { { name: "digging into an array with a negative index", key: "evm_median.outputs.reportsList.-1", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -175,7 +175,7 @@ func TestInterpolateKey(t *testing.T) { { name: "empty element", key: "evm_median.outputs..notAString", - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -205,7 +205,7 @@ func TestInterpolateInputsFromState(t *testing.T) { testCases := []struct { name string inputs map[string]any - state *executionState + state executionState expected any errMsg string }{ @@ -216,7 +216,7 @@ func TestInterpolateInputsFromState(t *testing.T) { "shouldinterpolate": "$(evm_median.outputs)", }, }, - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ @@ -236,7 +236,7 @@ func TestInterpolateInputsFromState(t *testing.T) { inputs: map[string]any{ "foo": "bar", }, - state: &executionState{ + state: executionState{ steps: map[string]*stepState{ "evm_median": { outputs: &stepOutput{ diff --git a/core/services/workflows/store.go b/core/services/workflows/store.go new file mode 100644 index 00000000000..bb9a8d14bcc --- /dev/null +++ b/core/services/workflows/store.go @@ -0,0 +1,52 @@ +package workflows + +import ( + "context" + "fmt" + "sync" +) + +type store struct { + idToState map[string]*executionState + mu sync.RWMutex +} + +func newStore() *store { + return &store{idToState: map[string]*executionState{}} +} + +func (s *store) add(ctx context.Context, state *executionState) error { + s.mu.Lock() + defer s.mu.Unlock() + _, ok := s.idToState[state.executionID] + if ok { + return fmt.Errorf("execution ID %s already exists in store", state.executionID) + } + + s.idToState[state.executionID] = state + return nil +} + +func (s *store) updateStep(ctx context.Context, step *stepState) (executionState, error) { + s.mu.Lock() + defer s.mu.Unlock() + state, ok := s.idToState[step.executionID] + if !ok { + return executionState{}, fmt.Errorf("could not find execution %s", step.executionID) + } + + state.steps[step.ref] = step + return *state, nil +} + +func (s *store) updateStatus(ctx context.Context, executionID string, status string) error { + s.mu.Lock() + defer s.mu.Unlock() + state, ok := s.idToState[executionID] + if !ok { + return fmt.Errorf("could not find execution %s", executionID) + } + + state.status = status + return nil +} diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go index bf8394af610..084a5544bfd 100644 --- a/core/services/workflows/workflow.go +++ b/core/services/workflows/workflow.go @@ -1,6 +1,13 @@ package workflows -import "gopkg.in/yaml.v3" +import ( + "fmt" + + "gopkg.in/yaml.v3" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/values" +) type Capability struct { Type string `yaml:"type"` @@ -9,15 +16,127 @@ type Capability struct { Config map[string]any `yaml:"config"` } -type Workflow struct { +type workflowSpec struct { Triggers []Capability `yaml:"triggers"` Actions []Capability `yaml:"actions"` Consensus []Capability `yaml:"consensus"` Targets []Capability `yaml:"targets"` } -func Parse(yamlWorkflow string) (*Workflow, error) { - wf := &Workflow{} - err := yaml.Unmarshal([]byte(yamlWorkflow), wf) +func (w *workflowSpec) steps() []Capability { + s := []Capability{} + s = append(s, w.Actions...) + s = append(s, w.Consensus...) + s = append(s, w.Targets...) + return s +} + +type workflow struct { + *graph[*node] + + triggers []*triggerCapability + + spec *workflowSpec +} + +type node struct { + Capability + dependencies []string + cachedCapability capabilities.CallbackExecutable + cachedConfig *values.Map +} + +type triggerCapability struct { + Capability + cachedTrigger capabilities.TriggerCapability +} + +const ( + keywordTrigger = "trigger" +) + +func Parse(yamlWorkflow string) (*workflow, error) { + wfs := &workflowSpec{} + err := yaml.Unmarshal([]byte(yamlWorkflow), wfs) + if err != nil { + return nil, err + } + + // Construct and validate the graph. We instantiate an + // empty graph with just one starting entry: `trigger`. + // This provides the starting point for our graph and + // points to all dependent nodes. + nodes := map[string]*node{ + keywordTrigger: {Capability: Capability{Ref: keywordTrigger}}, + } + adjacencies := map[string]map[string]struct{}{ + keywordTrigger: {}, + } + graph := &graph[*node]{ + adjacencies: adjacencies, + nodes: nodes, + } + for _, s := range wfs.steps() { + // For steps that don't have a ref, use + // the node's type as a default. + if s.Ref == "" { + s.Ref = s.Type + } + + _, ok := nodes[s.Ref] + if ok { + return nil, fmt.Errorf("duplicate reference %s found in workflow spec", s.Ref) + } + + nodes[s.Ref] = &node{Capability: s} + adjacencies[s.Ref] = map[string]struct{}{} + } + + for _, nd := range nodes { + refs, innerErr := findRefs(nd.Inputs) + if innerErr != nil { + return nil, innerErr + } + nd.dependencies = refs + + for _, r := range refs { + _, ok := nodes[r] + if !ok && r != keywordTrigger { + return nil, fmt.Errorf("invalid reference %s found in workflow spec", r) + } + + adjacencies[r][nd.Ref] = struct{}{} + + var found bool + innerErr := graph.walkDo(nd.Ref, func(n *node) error { + if n.Ref == r { + found = true + return nil + } + + return nil + }) + if innerErr != nil { + return nil, innerErr + } + + if found { + return nil, fmt.Errorf("found circular relationship between %s and %s", r, nd.Ref) + } + + } + } + + triggerNodes := []*triggerCapability{} + for _, t := range wfs.Triggers { + triggerNodes = append(triggerNodes, &triggerCapability{ + Capability: t, + }) + } + wf := &workflow{ + spec: wfs, + graph: graph, + triggers: triggerNodes, + } return wf, err } diff --git a/core/services/workflows/workflow_test.go b/core/services/workflows/workflow_test.go new file mode 100644 index 00000000000..81bd31cbd6f --- /dev/null +++ b/core/services/workflows/workflow_test.go @@ -0,0 +1,191 @@ +package workflows + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse_Graph(t *testing.T) { + testCases := []struct { + name string + yaml string + graph map[string]map[string]struct{} + errMsg string + }{ + { + name: "basic example", + yaml: ` +triggers: + - type: "a-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + graph: map[string]map[string]struct{}{ + keywordTrigger: { + "an-action": struct{}{}, + "a-consensus": struct{}{}, + }, + "an-action": { + "a-consensus": struct{}{}, + }, + "a-consensus": { + "a-target": struct{}{}, + }, + "a-target": {}, + }, + }, + { + name: "circular relationship", + yaml: ` +triggers: + - type: "a-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + output: $(a-second-action.outputs) + - type: "a-second-action" + ref: "a-second-action" + inputs: + output: $(an-action.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "found circular relationship", + }, + { + name: "indirect circular relationship", + yaml: ` +triggers: + - type: "a-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(a-third-action.outputs) + - type: "a-second-action" + ref: "a-second-action" + inputs: + output: $(an-action.outputs) + - type: "a-third-action" + ref: "a-third-action" + inputs: + output: $(a-second-action.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "found circular relationship", + }, + { + name: "relationship doesn't exist", + yaml: ` +triggers: + - type: "a-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(missing-action.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "invalid reference missing-action found in workflow spec", + }, + { + name: "relationship doesn't exist", + yaml: ` +triggers: + - type: "a-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(missing-action.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "invalid reference missing-action found in workflow spec", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(st *testing.T) { + wf, err := Parse(tc.yaml) + if tc.errMsg != "" { + assert.ErrorContains(st, err, tc.errMsg) + } else { + require.NoError(st, err) + assert.Equal(st, wf.graph.adjacencies, tc.graph) + } + }) + } +} From 0cec920c7fe3868de4809bf021c0a2a22af66fc8 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Tue, 19 Mar 2024 17:02:35 +0000 Subject: [PATCH 4/6] Replace graph with graph lib --- core/scripts/go.mod | 1 + core/scripts/go.sum | 2 + core/services/workflows/engine.go | 34 ++++-- core/services/workflows/graph.go | 62 ----------- core/services/workflows/graph_test.go | 49 --------- core/services/workflows/workflow.go | 125 +++++++++++++++-------- core/services/workflows/workflow_test.go | 49 ++++----- go.mod | 1 + go.sum | 2 + integration-tests/go.mod | 1 + integration-tests/go.sum | 2 + integration-tests/load/go.mod | 1 + integration-tests/load/go.sum | 2 + 13 files changed, 135 insertions(+), 196 deletions(-) delete mode 100644 core/services/workflows/graph.go delete mode 100644 core/services/workflows/graph_test.go diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 8b8a757a76c..122abccba67 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -104,6 +104,7 @@ require ( github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dominikbraun/graph v0.23.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/esote/minmaxheap v1.0.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 38d053c98fc..b14859a1d1c 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -363,6 +363,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index fa3017db6ad..f740213e627 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -193,8 +193,12 @@ func (e *Engine) loop(ctx context.Context) { case resp := <-e.callbackCh: if resp.Err != nil { e.logger.Errorf("trigger event was an error; not executing", resp.Err) - } else { - e.startExecution(ctx, resp.Value) + continue + } + + err := e.startExecution(ctx, resp.Value) + if err != nil { + e.logger.Errorf("failed to start execution: %w", err) } case dm := <-e.queue.out: <-e.newWorkerCh @@ -210,7 +214,7 @@ func (e *Engine) loop(ctx context.Context) { } } -func (e *Engine) startExecution(ctx context.Context, event values.Value) { +func (e *Engine) startExecution(ctx context.Context, event values.Value) error { executionID := uuid.New().String() e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID) ec := &executionState{ @@ -229,14 +233,21 @@ func (e *Engine) startExecution(ctx context.Context, event values.Value) { err := e.store.add(ctx, ec) if err != nil { - return + return err } // Find the tasks we need to fire when a trigger has fired and enqueue them. - for _, node := range e.workflow.adjacentNodes(keywordTrigger) { + an, err := e.workflow.adjacentNodes(keywordTrigger) + if err != nil { + return err + } + + for _, node := range an { e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID) e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref} } + + return nil } func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) error { @@ -247,7 +258,10 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err switch stepUpdate.status { case statusCompleted: - adjacentNodes := e.workflow.adjacentNodes(stepUpdate.ref) + adjacentNodes, err := e.workflow.adjacentNodes(stepUpdate.ref) + if err != nil { + return err + } // There are no nodes left to process in the current path, so let's check if // we've completed the workflow. // If not, we'll check adjacent nodes for any that are ready to process. @@ -337,9 +351,9 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { } func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { - node, ok := e.workflow.nodes[msg.stepRef] - if !ok { - return nil, nil, fmt.Errorf("could not get step for %s", msg.stepRef) + node, err := e.workflow.Vertex(msg.stepRef) + if err != nil { + return nil, nil, err } i, err := findAndInterpolateAllKeys(node.Inputs, msg.state) @@ -470,7 +484,6 @@ func NewEngine(cfg Config) (engine *Engine, err error) { newWorkerCh <- struct{}{} } - var wg sync.WaitGroup engine = &Engine{ logger: cfg.Lggr.Named("WorkflowEngine"), registry: cfg.Registry, @@ -480,7 +493,6 @@ func NewEngine(cfg Config) (engine *Engine, err error) { newWorkerCh: newWorkerCh, stepUpdateCh: make(chan stepState), callbackCh: make(chan capabilities.CapabilityResponse), - wg: wg, stopCh: make(chan struct{}), } return engine, nil diff --git a/core/services/workflows/graph.go b/core/services/workflows/graph.go deleted file mode 100644 index 36f0b0e0966..00000000000 --- a/core/services/workflows/graph.go +++ /dev/null @@ -1,62 +0,0 @@ -package workflows - -import "fmt" - -type graph[T any] struct { - // ref -> refs - adjacencies map[string]map[string]struct{} - // ref -> nodes - nodes map[string]T -} - -func (g *graph[T]) walkDo(startingRef string, f func(n T) error) error { - nodesToVisit := []string{startingRef} - for adj := range g.adjacencies[startingRef] { - nodesToVisit = append(nodesToVisit, adj) - } - - visited := map[string]struct{}{} - for { - if len(nodesToVisit) == 0 { - return nil - } - - curr := nodesToVisit[0] - nodesToVisit = nodesToVisit[1:] - if _, found := visited[curr]; found { - continue - } - - n, ok := g.nodes[curr] - if !ok { - return fmt.Errorf("could not find node with ref %s", curr) - } - visited[curr] = struct{}{} - - for adj := range g.adjacencies[curr] { - nodesToVisit = append(nodesToVisit, adj) - } - - err := f(n) - if err != nil { - return err - } - } -} - -func (g *graph[T]) adjacentNodes(ref string) []T { - refs, ok := g.adjacencies[ref] - if !ok { - return []T{} - } - - nodes := []T{} - for adjacent := range refs { - n, ok := g.nodes[adjacent] - if ok { - nodes = append(nodes, n) - } - } - - return nodes -} diff --git a/core/services/workflows/graph_test.go b/core/services/workflows/graph_test.go deleted file mode 100644 index d39e9575727..00000000000 --- a/core/services/workflows/graph_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package workflows - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestGraph(t *testing.T) { - g := &graph[int]{ - adjacencies: map[string]map[string]struct{}{ - "node1": { - "node2": struct{}{}, - "node3": struct{}{}, - }, - "node2": { - "node3": struct{}{}, - }, - "node3": { - "node4": struct{}{}, - }, - "node4": { - "node5": struct{}{}, - }, - }, - nodes: map[string]int{ - "node1": 1, - "node2": 2, - "node3": 3, - "node4": 4, - "node5": 5, - }, - } - - inOrder := []int{} - err := g.walkDo("node1", func(n int) error { - inOrder = append(inOrder, n) - return nil - }) - require.NoError(t, err) - expected := []int{ - 1, 2, 3, 4, 5, - } - assert.ElementsMatch(t, expected, inOrder) - - got := g.adjacentNodes("node1") - assert.Equal(t, []int{2, 3}, got) -} diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go index 084a5544bfd..52392607c60 100644 --- a/core/services/workflows/workflow.go +++ b/core/services/workflows/workflow.go @@ -5,6 +5,8 @@ import ( "gopkg.in/yaml.v3" + "github.com/dominikbraun/graph" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/values" ) @@ -32,13 +34,61 @@ func (w *workflowSpec) steps() []Capability { } type workflow struct { - *graph[*node] + graph.Graph[string, *node] triggers []*triggerCapability spec *workflowSpec } +func (w *workflow) walkDo(start string, do func(n *node) error) error { + var outerErr error + err := graph.BFS(w.Graph, start, func(ref string) bool { + n, err := w.Graph.Vertex(ref) + if err != nil { + outerErr = err + return true + } + + err = do(n) + if err != nil { + outerErr = err + return true + } + + return false + }) + if err != nil { + return err + } + + return outerErr +} + +func (w *workflow) adjacentNodes(start string) ([]*node, error) { + nodes := []*node{} + m, err := w.Graph.AdjacencyMap() + if err != nil { + return nil, err + } + + adj, ok := m[start] + if !ok { + return nil, fmt.Errorf("could not find node with ref %s", start) + } + + for adjacentRef := range adj { + n, err := w.Graph.Vertex(adjacentRef) + if err != nil { + return nil, err + } + + nodes = append(nodes, n) + } + + return nodes, nil +} + type node struct { Capability dependencies []string @@ -66,64 +116,53 @@ func Parse(yamlWorkflow string) (*workflow, error) { // empty graph with just one starting entry: `trigger`. // This provides the starting point for our graph and // points to all dependent nodes. - nodes := map[string]*node{ - keywordTrigger: {Capability: Capability{Ref: keywordTrigger}}, + nodeHash := func(n *node) string { + return n.Ref } - adjacencies := map[string]map[string]struct{}{ - keywordTrigger: {}, - } - graph := &graph[*node]{ - adjacencies: adjacencies, - nodes: nodes, + g := graph.New( + nodeHash, + graph.PreventCycles(), + graph.Directed(), + ) + err = g.AddVertex(&node{ + Capability: Capability{Ref: keywordTrigger}, + }) + if err != nil { + return nil, err } + for _, s := range wfs.steps() { - // For steps that don't have a ref, use - // the node's type as a default. if s.Ref == "" { s.Ref = s.Type } - _, ok := nodes[s.Ref] - if ok { - return nil, fmt.Errorf("duplicate reference %s found in workflow spec", s.Ref) + err := g.AddVertex(&node{Capability: s}) + if err != nil { + return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, err) } + } - nodes[s.Ref] = &node{Capability: s} - adjacencies[s.Ref] = map[string]struct{}{} + nodeRefs, err := g.AdjacencyMap() + if err != nil { + return nil, err } + for nodeRef := range nodeRefs { + node, err := g.Vertex(nodeRef) + if err != nil { + return nil, err + } - for _, nd := range nodes { - refs, innerErr := findRefs(nd.Inputs) + refs, innerErr := findRefs(node.Inputs) if innerErr != nil { return nil, innerErr } - nd.dependencies = refs + node.dependencies = refs for _, r := range refs { - _, ok := nodes[r] - if !ok && r != keywordTrigger { - return nil, fmt.Errorf("invalid reference %s found in workflow spec", r) + err = g.AddEdge(r, node.Ref) + if err != nil { + return nil, err } - - adjacencies[r][nd.Ref] = struct{}{} - - var found bool - innerErr := graph.walkDo(nd.Ref, func(n *node) error { - if n.Ref == r { - found = true - return nil - } - - return nil - }) - if innerErr != nil { - return nil, innerErr - } - - if found { - return nil, fmt.Errorf("found circular relationship between %s and %s", r, nd.Ref) - } - } } @@ -135,7 +174,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { } wf := &workflow{ spec: wfs, - graph: graph, + Graph: g, triggers: triggerNodes, } return wf, err diff --git a/core/services/workflows/workflow_test.go b/core/services/workflows/workflow_test.go index 81bd31cbd6f..f1ba8f0b64f 100644 --- a/core/services/workflows/workflow_test.go +++ b/core/services/workflows/workflow_test.go @@ -83,7 +83,7 @@ targets: inputs: consensus_output: $(a-consensus.outputs) `, - errMsg: "found circular relationship", + errMsg: "edge would create a cycle", }, { name: "indirect circular relationship", @@ -119,7 +119,7 @@ targets: inputs: consensus_output: $(a-consensus.outputs) `, - errMsg: "found circular relationship", + errMsg: "edge would create a cycle", }, { name: "relationship doesn't exist", @@ -146,34 +146,7 @@ targets: inputs: consensus_output: $(a-consensus.outputs) `, - errMsg: "invalid reference missing-action found in workflow spec", - }, - { - name: "relationship doesn't exist", - yaml: ` -triggers: - - type: "a-trigger" - -actions: - - type: "an-action" - ref: "an-action" - inputs: - trigger_output: $(trigger.outputs) - action_output: $(missing-action.outputs) - -consensus: - - type: "a-consensus" - ref: "a-consensus" - inputs: - an-action_output: $(an-action.outputs) - -targets: - - type: "a-target" - ref: "a-target" - inputs: - consensus_output: $(a-consensus.outputs) -`, - errMsg: "invalid reference missing-action found in workflow spec", + errMsg: "source vertex missing-action: vertex not found", }, } @@ -184,7 +157,21 @@ targets: assert.ErrorContains(st, err, tc.errMsg) } else { require.NoError(st, err) - assert.Equal(st, wf.graph.adjacencies, tc.graph) + + adjacencies, err := wf.AdjacencyMap() + require.NoError(t, err) + + got := map[string]map[string]struct{}{} + for k, v := range adjacencies { + if _, ok := got[k]; !ok { + got[k] = map[string]struct{}{} + } + for adj := range v { + got[k][adj] = struct{}{} + } + } + + assert.Equal(st, tc.graph, got, adjacencies) } }) } diff --git a/go.mod b/go.mod index 77d2b20c09e..9fc4395ab4e 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cometbft/cometbft v0.37.2 github.com/cosmos/cosmos-sdk v0.47.4 github.com/danielkov/gin-helmet v0.0.0-20171108135313-1387e224435e + github.com/dominikbraun/graph v0.23.0 github.com/esote/minmaxheap v1.0.0 github.com/ethereum/go-ethereum v1.13.8 github.com/fatih/color v1.16.0 diff --git a/go.sum b/go.sum index b64249e2094..b33905be904 100644 --- a/go.sum +++ b/go.sum @@ -346,6 +346,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 098434c09f9..5f45141a6ee 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -154,6 +154,7 @@ require ( github.com/docker/docker v25.0.2+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dominikbraun/graph v0.23.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 47d7ec120cd..e1c0a609817 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -448,6 +448,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6 github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 3b90ec5bb15..3ab4e5000f1 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -131,6 +131,7 @@ require ( github.com/docker/docker v25.0.2+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dominikbraun/graph v0.23.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 6ef7a2bb3ec..568c6801537 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -438,6 +438,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6 github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= +github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= From 51616500e942bf036f66c9153f90ac4859224cf2 Mon Sep 17 00:00:00 2001 From: HenryNguyen5 <6404866+HenryNguyen5@users.noreply.github.com> Date: Tue, 19 Mar 2024 20:20:07 -0400 Subject: [PATCH 5/6] Add comments and renames --- core/services/workflows/engine.go | 127 ++++++++++++++++---------- core/services/workflows/queue.go | 14 +-- core/services/workflows/queue_test.go | 6 +- core/services/workflows/state.go | 60 ++++++++---- core/services/workflows/store.go | 3 + core/services/workflows/workflow.go | 53 ++++++----- 6 files changed, 163 insertions(+), 100 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index f740213e627..5e97316c4e3 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -22,16 +22,20 @@ const ( mockedTriggerID = "cccccccccc0000000000000000000000" ) +// Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine logger logger.Logger registry types.CapabilitiesRegistry workflow *workflow - store *store - queue *queue[stepRequest] - callbackCh chan capabilities.CapabilityResponse + executionStates *store + // NOTE: I do find it confusing that pending step requests are global rather than scoped to a single execution + pendingStepRequests *queue[stepRequest] + triggerEvents chan capabilities.CapabilityResponse newWorkerCh chan struct{} stepUpdateCh chan stepState + // wg is only used to make sure that in the case of a shutdown, + // we wait for all pending steps to finish. wg sync.WaitGroup stopCh services.StopChan } @@ -43,7 +47,7 @@ func (e *Engine) Start(ctx context.Context) error { // queue.start will add to the wg and // spin off a goroutine. - e.queue.start(ctx, &e.wg) + e.pendingStepRequests.start(ctx, &e.wg) e.wg.Add(2) go e.init(ctx) @@ -53,6 +57,13 @@ func (e *Engine) Start(ctx context.Context) error { }) } +// init does the following: +// +// 1. Resolves the underlying capability for each trigger +// 2. Registers each step's capability to this workflow +// 3. Registers for trigger events now that all capabilities are resolved +// +// Steps 1 and 2 are retried every 5 seconds until successful. func (e *Engine) init(ctx context.Context) { defer e.wg.Done() @@ -67,6 +78,7 @@ LOOP: case <-ctx.Done(): return case <-ticker.C: + // Resolve the underlying capability for each trigger for _, t := range e.workflow.triggers { cp, err := e.registry.GetTrigger(ctx, t.Type) if err != nil { @@ -77,13 +89,15 @@ LOOP: } } - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { - // The graph contains a dummy node for triggers, but + // Walk the graph and register each step's capablity to this workflow + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { + // The graph contains a dummy step for triggers, but // we handle triggers separately since there might be more than one. if n.Ref == keywordTrigger { return nil } + // If the capability is already cached, that means we've already registered it if n.cachedCapability != nil { return nil } @@ -93,6 +107,7 @@ LOOP: return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", n.Type, innerErr, retrySec) } + // We only support CallbackExecutable capabilities for now cc, ok := cp.(capabilities.CallbackExecutable) if !ok { return fmt.Errorf("could not coerce capability %s to CallbackExecutable", n.Type) @@ -143,6 +158,7 @@ LOOP: e.logger.Info("engine initialized") } +// registerTrigger is used during the initialization phase to bind a trigger to this workflow func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( map[string]any{ @@ -165,7 +181,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro Config: tc, Inputs: triggerInputs, } - err = t.cachedTrigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) + err = t.cachedTrigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest) if err != nil { return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err) } @@ -173,14 +189,17 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro } // loop is the synchronization goroutine for the engine, and is responsible for: -// - dispatching new workers up to the limit specified (default = 100) -// - starting a new execution when a trigger emits a message on `callbackCh` -// - updating the `executionState` with the outcome of a `step`. +// - dispatching new workers up to the limit specified (default = 100) +// - starting a new execution when a trigger emits a message on `triggerEvents` +// - updating the `executionState` with the outcome of a `step`. // // Note: `executionState` is only mutated by this loop directly. +// // This is important to avoid data races, and any accesses of `executionState` by any other // goroutine should happen via a `stepRequest` message containing a copy of the latest -// `executionState`. This works because a worker thread for a given step will only +// `executionState`. +// +// This works because a worker thread for a given step will only // be spun up once all dependent steps have completed (guaranteeing that the state associated // with those dependent steps will no longer change). Therefore as long this worker thread only // accesses data from dependent states, the data will never be stale. @@ -190,7 +209,7 @@ func (e *Engine) loop(ctx context.Context) { select { case <-ctx.Done(): return - case resp := <-e.callbackCh: + case resp := <-e.triggerEvents: if resp.Err != nil { e.logger.Errorf("trigger event was an error; not executing", resp.Err) continue @@ -200,10 +219,13 @@ func (e *Engine) loop(ctx context.Context) { if err != nil { e.logger.Errorf("failed to start execution: %w", err) } - case dm := <-e.queue.out: + case stepRequest := <-e.pendingStepRequests.dequeue: + // Wait for a new worker to be available before dispatching a new one. <-e.newWorkerCh + // NOTE: Can we add this to e.workerForStep instead? e.wg.Add(1) - go e.workerForStep(ctx, dm) + // NOTE: Should we instead add a "process" method to the queue, and do concurrency control there? + go e.workerForStepRequest(ctx, stepRequest) case stepUpdate := <-e.stepUpdateCh: // Executed synchronously to ensure we correctly schedule subsequent tasks. err := e.handleStepUpdate(ctx, stepUpdate) @@ -214,6 +236,7 @@ func (e *Engine) loop(ctx context.Context) { } } +// startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, event values.Value) error { executionID := uuid.New().String() e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID) @@ -231,44 +254,45 @@ func (e *Engine) startExecution(ctx context.Context, event values.Value) error { status: statusStarted, } - err := e.store.add(ctx, ec) + err := e.executionStates.add(ctx, ec) if err != nil { return err } // Find the tasks we need to fire when a trigger has fired and enqueue them. - an, err := e.workflow.adjacentNodes(keywordTrigger) + triggerDependents, err := e.workflow.dependents(keywordTrigger) if err != nil { return err } - for _, node := range an { - e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID) - e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref} + for _, step := range triggerDependents { + e.logger.Debugw("step request enqueued", "ref", step.Ref, "executionID", executionID) + e.pendingStepRequests.enqueue <- stepRequest{state: copyState(*ec), stepRef: step.Ref} } return nil } func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) error { - state, err := e.store.updateStep(ctx, &stepUpdate) + state, err := e.executionStates.updateStep(ctx, &stepUpdate) if err != nil { return err } switch stepUpdate.status { case statusCompleted: - adjacentNodes, err := e.workflow.adjacentNodes(stepUpdate.ref) + stepDependents, err := e.workflow.dependents(stepUpdate.ref) if err != nil { return err } - // There are no nodes left to process in the current path, so let's check if + // There are no steps left to process in the current path, so let's check if // we've completed the workflow. - // If not, we'll check adjacent nodes for any that are ready to process. - if len(adjacentNodes) == 0 { + // If not, we'll check for any dependents that are ready to process. + if len(stepDependents) == 0 { workflowCompleted := true - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { step, ok := state.steps[n.Ref] + // Note: Why do we not return an error if !ok? if !ok { workflowCompleted = false return nil @@ -286,35 +310,38 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err } if workflowCompleted { - err := e.store.updateStatus(ctx, state.executionID, statusCompleted) + err := e.executionStates.updateStatus(ctx, state.executionID, statusCompleted) if err != nil { return err } } } - for _, node := range adjacentNodes { - var anyNotCompleted bool - for _, dr := range node.dependencies { - step, ok := state.steps[dr] + for _, step := range stepDependents { + // Check if all dependencies are completed for the current step + var waitingOnDependencies bool + for _, dr := range step.dependencies { + stepState, ok := state.steps[dr] if !ok { return fmt.Errorf("could not locate dependency %s in %+v", dr, state) } - if step.status != statusCompleted { - anyNotCompleted = true + // NOTE: Should we also check for statusErrored? + if stepState.status != statusCompleted { + waitingOnDependencies = true } } - if !anyNotCompleted { - e.queue.in <- stepRequest{ + // If all dependencies are completed, enqueue the step. + if !waitingOnDependencies { + e.pendingStepRequests.enqueue <- stepRequest{ state: copyState(state), - stepRef: node.Ref, + stepRef: step.Ref, } } } case statusErrored: - err := e.store.updateStatus(ctx, state.executionID, statusErrored) + err := e.executionStates.updateStatus(ctx, state.executionID, statusErrored) if err != nil { return err } @@ -323,7 +350,8 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err return nil } -func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { +// NOTE: Should this be attached to a step struct instead of the engine? +func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { defer e.wg.Done() e.logger.Debugw("executing on a step event", "event", msg, "executionID", msg.state.executionID) @@ -333,7 +361,7 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { ref: msg.stepRef, } - inputs, outputs, err := e.handleStep(ctx, msg) + inputs, outputs, err := e.executeStep(ctx, msg) if err != nil { e.logger.Errorf("error executing step request: %w", err, "executionID", msg.state.executionID, "stepRef", msg.stepRef) stepState.outputs.err = err @@ -350,13 +378,14 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) { e.newWorkerCh <- struct{}{} } -func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { - node, err := e.workflow.Vertex(msg.stepRef) +// executeStep executes the referenced capability within a step and returns the result. +func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { + step, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err } - i, err := findAndInterpolateAllKeys(node.Inputs, msg.state) + i, err := findAndInterpolateAllKeys(step.Inputs, msg.state) if err != nil { return nil, nil, err } @@ -368,14 +397,14 @@ func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, tr := capabilities.CapabilityRequest{ Inputs: inputs, - Config: node.cachedConfig, + Config: step.cachedConfig, Metadata: capabilities.RequestMetadata{ WorkflowID: msg.state.workflowID, WorkflowExecutionID: msg.state.executionID, }, } - resp, err := capabilities.ExecuteSync(ctx, node.cachedCapability, tr) + resp, err := capabilities.ExecuteSync(ctx, step.cachedCapability, tr) if err != nil { return inputs, nil, err } @@ -389,7 +418,7 @@ func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, return inputs, resp.Underlying[0], err } -func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) error { +func (e *Engine) deregisterTrigger(_ context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( map[string]any{ "triggerId": mockedTriggerID, @@ -424,7 +453,7 @@ func (e *Engine) Close() error { close(e.stopCh) e.wg.Wait() - err := e.workflow.walkDo(keywordTrigger, func(n *node) error { + err := e.workflow.walkDo(keywordTrigger, func(n *step) error { if n.Ref == keywordTrigger { return nil } @@ -468,7 +497,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } // TODO: validation of the workflow spec // We'll need to check, among other things: - // - that there are no node `ref` called `trigger` as this is reserved for any triggers + // - that there are no step `ref` called `trigger` as this is reserved for any triggers // - that there are no duplicate `ref`s // - that the `ref` for any triggers is empty -- and filled in with `trigger` // - etc. @@ -488,11 +517,11 @@ func NewEngine(cfg Config) (engine *Engine, err error) { logger: cfg.Lggr.Named("WorkflowEngine"), registry: cfg.Registry, workflow: workflow, - store: newStore(), - queue: newQueue[stepRequest](), + executionStates: newStore(), + pendingStepRequests: newQueue[stepRequest](), newWorkerCh: newWorkerCh, stepUpdateCh: make(chan stepState), - callbackCh: make(chan capabilities.CapabilityResponse), + triggerEvents: make(chan capabilities.CapabilityResponse), stopCh: make(chan struct{}), } return engine, nil diff --git a/core/services/workflows/queue.go b/core/services/workflows/queue.go index 5ae9d4e4424..8ce23b0d529 100644 --- a/core/services/workflows/queue.go +++ b/core/services/workflows/queue.go @@ -11,26 +11,26 @@ type stepRequest struct { } type queue[T any] struct { - in chan T - out chan T + enqueue chan T + dequeue chan T } func (q *queue[T]) worker(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - + // NOTE: Should there be a max size for the queue? qData := []T{} for { select { case <-ctx.Done(): return - case inc := <-q.in: + case inc := <-q.enqueue: qData = append(qData, inc) default: if len(qData) > 0 { popped := qData[0] select { - case q.out <- popped: + case q.dequeue <- popped: qData = qData[1:] default: } @@ -47,7 +47,7 @@ func (q *queue[T]) start(ctx context.Context, wg *sync.WaitGroup) { func newQueue[T any]() *queue[T] { return &queue[T]{ - in: make(chan T), - out: make(chan T), + enqueue: make(chan T), + dequeue: make(chan T), } } diff --git a/core/services/workflows/queue_test.go b/core/services/workflows/queue_test.go index d115cd69a4e..c5cf5efe781 100644 --- a/core/services/workflows/queue_test.go +++ b/core/services/workflows/queue_test.go @@ -17,15 +17,15 @@ func TestQueue(t *testing.T) { ints := []int{1, 2, 3, 4, 5} for _, i := range ints { - q.in <- i + q.enqueue <- i } got := []int{} for i := 0; i < 5; i++ { - got = append(got, <-q.out) + got = append(got, <-q.dequeue) } assert.Equal(t, ints, got) - assert.Len(t, q.out, 0) + assert.Len(t, q.dequeue, 0) } diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index 62eec51a835..25c573ed7d9 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -38,14 +38,14 @@ type executionState struct { status string } +// copyState returns a deep copy of the input executionState func copyState(es executionState) executionState { steps := map[string]*stepState{} for ref, step := range es.steps { var mval *values.Map if step.inputs != nil { mp := values.Proto(step.inputs).GetMapValue() - copied := values.FromMapValueProto(mp) - mval = copied + mval = values.FromMapValueProto(mp) } op := values.Proto(step.outputs.value) @@ -76,9 +76,14 @@ func copyState(es executionState) executionState { // interpolateKey takes a multi-part, dot-separated key and attempts to replace // it with its corresponding value in `state`. -// A key is valid if: -// - it contains at least two parts, with the first part being the workflow step's `ref` variable, and the second being one of `inputs` or `outputs` -// - any subsequent parts will be processed as a list index (if the current element is a list) or a map key (if it's a map) +// +// A key is valid if it contains at least two parts, with: +// - the first part being the workflow step's `ref` variable +// - the second part being one of `inputs` or `outputs` +// +// If a key has more than two parts, then we traverse the parts +// to find the value we want to replace. +// We support traversing both nested maps and lists and any combination of the two. func interpolateKey(key string, state executionState) (any, error) { parts := strings.Split(key, ".") @@ -86,6 +91,7 @@ func interpolateKey(key string, state executionState) (any, error) { return "", fmt.Errorf("cannot interpolate %s: must have at least two parts", key) } + // lookup the step we want to get either input or output state from sc, ok := state.steps[parts[0]] if !ok { return "", fmt.Errorf("could not find ref `%s`", parts[0]) @@ -94,6 +100,7 @@ func interpolateKey(key string, state executionState) (any, error) { var value values.Value switch parts[1] { case "inputs": + // Can this error? What happens if this input depends on a previous step's output? value = sc.inputs case "outputs": if sc.outputs.err != nil { @@ -116,24 +123,22 @@ func interpolateKey(key string, state executionState) (any, error) { case map[string]any: inner, ok := v[r] if !ok { + // probably worthwhile to print the entire reference here return "", fmt.Errorf("could not find ref part `%s` in `%+v`", r, v) } val = inner case []any: - d, err := strconv.Atoi(r) + i, err := strconv.Atoi(r) if err != nil { return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: `%s` is not convertible to an int", r, v, r) } - if d > len(v)-1 { - return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: cannot fetch index %d", r, v, d) + if (i > len(v)-1) || (i < 0){ + return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index out of bounds %d", r, v, i) } - if d < 0 { - return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`: index %d must be a positive number", r, v, d) - } - val = v[d] + val = v[i] default: return "", fmt.Errorf("could not interpolate ref part `%s` in `%+v`", r, val) } @@ -148,9 +153,10 @@ var ( // findAndInterpolateAllKeys takes an `input` any value, and recursively // identifies any values that should be replaced from `state`. -// A value `v` should be replaced if it is wrapped as follows `$(v)`. +// +// A value `v` should be replaced if it is wrapped as follows: `$(v)`. func findAndInterpolateAllKeys(input any, state executionState) (any, error) { - return traverse( + return deepMap( input, func(el string) (any, error) { matches := interpolationTokenRe.FindStringSubmatch(el) @@ -164,10 +170,17 @@ func findAndInterpolateAllKeys(input any, state executionState) (any, error) { ) } +// findRefs takes an `inputs` map and returns a list of all the step references +// contained within it. func findRefs(inputs map[string]any) ([]string, error) { refs := []string{} - _, err := traverse( + _, err := deepMap( inputs, + // This function is called for each string in the map + // for each string, we iterate over each match of the interpolation token + // - if there are no matches, return no reference + // - if there is one match, return the reference + // - if there are multiple matches (in the case of a multi-part state reference), return just the step ref func(el string) (any, error) { matches := interpolationTokenRe.FindStringSubmatch(el) if len(matches) < 2 { @@ -187,10 +200,19 @@ func findRefs(inputs map[string]any) ([]string, error) { return refs, err } -func traverse(input any, do func(el string) (any, error)) (any, error) { +// deepMap recursively applies a transformation function +// over each string within: +// +// - a map[string]any +// - a []any +// - a string +func deepMap(input any, transform func(el string) (any, error)) (any, error) { + // in the case of a string, simply apply the transformation + // in the case of a map, recurse and apply the transformation to each value + // in the case of a list, recurse and apply the transformation to each element switch tv := input.(type) { case string: - nv, err := do(tv) + nv, err := transform(tv) if err != nil { return nil, err } @@ -199,7 +221,7 @@ func traverse(input any, do func(el string) (any, error)) (any, error) { case map[string]any: nm := map[string]any{} for k, v := range tv { - nv, err := traverse(v, do) + nv, err := deepMap(v, transform) if err != nil { return nil, err } @@ -210,7 +232,7 @@ func traverse(input any, do func(el string) (any, error)) (any, error) { case []any: a := []any{} for _, el := range tv { - ne, err := traverse(el, do) + ne, err := deepMap(el, transform) if err != nil { return nil, err } diff --git a/core/services/workflows/store.go b/core/services/workflows/store.go index bb9a8d14bcc..4527e2b4f16 100644 --- a/core/services/workflows/store.go +++ b/core/services/workflows/store.go @@ -15,6 +15,7 @@ func newStore() *store { return &store{idToState: map[string]*executionState{}} } +// add adds a new execution state under the given executionID func (s *store) add(ctx context.Context, state *executionState) error { s.mu.Lock() defer s.mu.Unlock() @@ -27,6 +28,7 @@ func (s *store) add(ctx context.Context, state *executionState) error { return nil } +// updateStep updates a step for the given executionID func (s *store) updateStep(ctx context.Context, step *stepState) (executionState, error) { s.mu.Lock() defer s.mu.Unlock() @@ -39,6 +41,7 @@ func (s *store) updateStep(ctx context.Context, step *stepState) (executionState return *state, nil } +// updateStatus updates the status for the given executionID func (s *store) updateStatus(ctx context.Context, executionID string, status string) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go index 52392607c60..21d734553d0 100644 --- a/core/services/workflows/workflow.go +++ b/core/services/workflows/workflow.go @@ -11,37 +11,45 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" ) -type Capability struct { +// yamlStep is the yaml parsed representation of a step in a workflow. +type yamlStep struct { Type string `yaml:"type"` Ref string `yaml:"ref"` Inputs map[string]any `yaml:"inputs"` Config map[string]any `yaml:"config"` } -type workflowSpec struct { - Triggers []Capability `yaml:"triggers"` - Actions []Capability `yaml:"actions"` - Consensus []Capability `yaml:"consensus"` - Targets []Capability `yaml:"targets"` +// yamlWorkflowSpec is the yaml parsed representation of a workflow. +type yamlWorkflowSpec struct { + Triggers []yamlStep `yaml:"triggers"` + Actions []yamlStep `yaml:"actions"` + Consensus []yamlStep `yaml:"consensus"` + Targets []yamlStep `yaml:"targets"` } -func (w *workflowSpec) steps() []Capability { - s := []Capability{} +func (w *yamlWorkflowSpec) steps() []yamlStep { + s := []yamlStep{} s = append(s, w.Actions...) s = append(s, w.Consensus...) s = append(s, w.Targets...) return s } +// workflow is a directed graph of nodes, where each node is a step. +// +// triggers are special steps that are stored separately, they're +// treated differently due to their nature of being the starting +// point of a workflow. type workflow struct { - graph.Graph[string, *node] + graph.Graph[string, *step] triggers []*triggerCapability - spec *workflowSpec + spec *yamlWorkflowSpec } -func (w *workflow) walkDo(start string, do func(n *node) error) error { +// NOTE: should we make this concurrent? +func (w *workflow) walkDo(start string, do func(n *step) error) error { var outerErr error err := graph.BFS(w.Graph, start, func(ref string) bool { n, err := w.Graph.Vertex(ref) @@ -65,8 +73,8 @@ func (w *workflow) walkDo(start string, do func(n *node) error) error { return outerErr } -func (w *workflow) adjacentNodes(start string) ([]*node, error) { - nodes := []*node{} +func (w *workflow) dependents(start string) ([]*step, error) { + nodes := []*step{} m, err := w.Graph.AdjacencyMap() if err != nil { return nil, err @@ -89,15 +97,16 @@ func (w *workflow) adjacentNodes(start string) ([]*node, error) { return nodes, nil } -type node struct { - Capability +// step wraps a yamlStep with additional context for dependencies and execution +type step struct { + yamlStep dependencies []string cachedCapability capabilities.CallbackExecutable cachedConfig *values.Map } type triggerCapability struct { - Capability + yamlStep cachedTrigger capabilities.TriggerCapability } @@ -106,7 +115,7 @@ const ( ) func Parse(yamlWorkflow string) (*workflow, error) { - wfs := &workflowSpec{} + wfs := &yamlWorkflowSpec{} err := yaml.Unmarshal([]byte(yamlWorkflow), wfs) if err != nil { return nil, err @@ -116,7 +125,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { // empty graph with just one starting entry: `trigger`. // This provides the starting point for our graph and // points to all dependent nodes. - nodeHash := func(n *node) string { + nodeHash := func(n *step) string { return n.Ref } g := graph.New( @@ -124,8 +133,8 @@ func Parse(yamlWorkflow string) (*workflow, error) { graph.PreventCycles(), graph.Directed(), ) - err = g.AddVertex(&node{ - Capability: Capability{Ref: keywordTrigger}, + err = g.AddVertex(&step{ + yamlStep: yamlStep{Ref: keywordTrigger}, }) if err != nil { return nil, err @@ -136,7 +145,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { s.Ref = s.Type } - err := g.AddVertex(&node{Capability: s}) + err := g.AddVertex(&step{yamlStep: s}) if err != nil { return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, err) } @@ -169,7 +178,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { triggerNodes := []*triggerCapability{} for _, t := range wfs.Triggers { triggerNodes = append(triggerNodes, &triggerCapability{ - Capability: t, + yamlStep: t, }) } wf := &workflow{ From 33eef02be2ae3f0d4872ea4478107a7ce96d9d6f Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Tue, 19 Mar 2024 17:02:35 +0000 Subject: [PATCH 6/6] Replace graph with graph lib --- core/services/workflows/delegate.go | 24 +++---- core/services/workflows/engine.go | 87 ++++++++++++------------ core/services/workflows/queue.go | 53 --------------- core/services/workflows/queue_test.go | 31 --------- core/services/workflows/store.go | 15 ++-- core/services/workflows/workflow.go | 75 ++++++++++++-------- core/services/workflows/workflow_test.go | 38 +++++++++++ 7 files changed, 150 insertions(+), 173 deletions(-) delete mode 100644 core/services/workflows/queue.go delete mode 100644 core/services/workflows/queue_test.go diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index c1bff37a33f..2c95b478709 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -26,39 +26,39 @@ triggers: consensus: - type: "offchain_reporting" - ref: evm_median + ref: "evm_median" inputs: observations: - - $(trigger.outputs) + - "$(trigger.outputs)" config: - aggregation_method: data_feeds_2_0 + aggregation_method: "data_feeds_2_0" aggregation_config: - 0x1111111111111111111100000000000000000000000000000000000000000000: + "0x1111111111111111111100000000000000000000000000000000000000000000": deviation: "0.001" heartbeat: "30m" - 0x2222222222222222222200000000000000000000000000000000000000000000: + "0x2222222222222222222200000000000000000000000000000000000000000000": deviation: "0.001" heartbeat: "30m" - 0x3333333333333333333300000000000000000000000000000000000000000000: + "0x3333333333333333333300000000000000000000000000000000000000000000": deviation: "0.001" heartbeat: "30m" - encoder: EVM + encoder: "EVM" encoder_config: abi: "mercury_reports bytes[]" targets: - - type: write_polygon-testnet-mumbai + - type: "write_polygon-testnet-mumbai" inputs: report: - - $(evm_median.outputs.reports) + - "$(evm_median.outputs.reports)" config: address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" - params: [($inputs.report)] + params: ["$(inputs.report)"] abi: "receive(report bytes)" - - type: write_ethereum-testnet-sepolia + - type: "write_ethereum-testnet-sepolia" inputs: report: - - $(evm_median.outputs.reports) + - "$(evm_median.outputs.reports)" config: address: "0x54e220867af6683aE6DcBF535B4f952cB5116510" params: ["$(inputs.report)"] diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index f740213e627..b36f1199ddc 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -24,16 +24,16 @@ const ( type Engine struct { services.StateMachine - logger logger.Logger - registry types.CapabilitiesRegistry - workflow *workflow - store *store - queue *queue[stepRequest] - callbackCh chan capabilities.CapabilityResponse - newWorkerCh chan struct{} - stepUpdateCh chan stepState - wg sync.WaitGroup - stopCh services.StopChan + logger logger.Logger + registry types.CapabilitiesRegistry + workflow *workflow + store *inMemoryStore + queue chan stepRequest + triggerEvents chan capabilities.CapabilityResponse + newWorkerCh chan struct{} + stepUpdateCh chan stepState + wg sync.WaitGroup + stopCh services.StopChan } func (e *Engine) Start(ctx context.Context) error { @@ -41,10 +41,6 @@ func (e *Engine) Start(ctx context.Context) error { // create a new context, since the one passed in via Start is short-lived. ctx, _ := e.stopCh.NewCtx() - // queue.start will add to the wg and - // spin off a goroutine. - e.queue.start(ctx, &e.wg) - e.wg.Add(2) go e.init(ctx) go e.loop(ctx) @@ -68,13 +64,14 @@ LOOP: return case <-ticker.C: for _, t := range e.workflow.triggers { - cp, err := e.registry.GetTrigger(ctx, t.Type) + tg, err := e.registry.GetTrigger(ctx, t.Type) if err != nil { initSuccessful = false e.logger.Errorf("failed to get trigger capability: %s, retrying in %d seconds", err, retrySec) - } else { - t.cachedTrigger = cp + continue } + + t.trigger = tg } err := e.workflow.walkDo(keywordTrigger, func(n *node) error { @@ -84,7 +81,7 @@ LOOP: return nil } - if n.cachedCapability != nil { + if n.capability != nil { return nil } @@ -98,19 +95,19 @@ LOOP: return fmt.Errorf("could not coerce capability %s to CallbackExecutable", n.Type) } - if n.cachedConfig == nil { + if n.config == nil { configMap, ierr := values.NewMap(n.Config) - if innerErr != nil { + if ierr != nil { return fmt.Errorf("failed to convert config to values.Map: %s", ierr) } - n.cachedConfig = configMap + n.config = configMap } reg := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ WorkflowID: mockedWorkflowID, }, - Config: n.cachedConfig, + Config: n.config, } innerErr = cc.RegisterToWorkflow(ctx, reg) @@ -118,7 +115,7 @@ LOOP: return fmt.Errorf("failed to register to workflow: %+v", reg) } - n.cachedCapability = cc + n.capability = cc return nil }) if err != nil { @@ -165,7 +162,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro Config: tc, Inputs: triggerInputs, } - err = t.cachedTrigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest) + err = t.trigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest) if err != nil { return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err) } @@ -190,7 +187,7 @@ func (e *Engine) loop(ctx context.Context) { select { case <-ctx.Done(): return - case resp := <-e.callbackCh: + case resp := <-e.triggerEvents: if resp.Err != nil { e.logger.Errorf("trigger event was an error; not executing", resp.Err) continue @@ -200,7 +197,7 @@ func (e *Engine) loop(ctx context.Context) { if err != nil { e.logger.Errorf("failed to start execution: %w", err) } - case dm := <-e.queue.out: + case dm := <-e.queue: <-e.newWorkerCh e.wg.Add(1) go e.workerForStep(ctx, dm) @@ -244,7 +241,7 @@ func (e *Engine) startExecution(ctx context.Context, event values.Value) error { for _, node := range an { e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID) - e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref} + e.queue <- stepRequest{state: copyState(*ec), stepRef: node.Ref} } return nil @@ -307,7 +304,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err } if !anyNotCompleted { - e.queue.in <- stepRequest{ + e.queue <- stepRequest{ state: copyState(state), stepRef: node.Ref, } @@ -368,14 +365,14 @@ func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, tr := capabilities.CapabilityRequest{ Inputs: inputs, - Config: node.cachedConfig, + Config: node.config, Metadata: capabilities.RequestMetadata{ WorkflowID: msg.state.workflowID, WorkflowExecutionID: msg.state.executionID, }, } - resp, err := capabilities.ExecuteSync(ctx, node.cachedCapability, tr) + resp, err := capabilities.ExecuteSync(ctx, node.capability, tr) if err != nil { return inputs, nil, err } @@ -404,7 +401,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) er }, Inputs: triggerInputs, } - return t.cachedTrigger.UnregisterTrigger(context.Background(), deregRequest) + return t.trigger.UnregisterTrigger(context.Background(), deregRequest) } func (e *Engine) Close() error { @@ -433,10 +430,10 @@ func (e *Engine) Close() error { Metadata: capabilities.RegistrationMetadata{ WorkflowID: mockedWorkflowID, }, - Config: n.cachedConfig, + Config: n.config, } - innerErr := n.cachedCapability.UnregisterFromWorkflow(ctx, reg) + innerErr := n.capability.UnregisterFromWorkflow(ctx, reg) if innerErr != nil { return fmt.Errorf("failed to unregister from workflow: %+v", reg) } @@ -456,16 +453,22 @@ type Config struct { Lggr logger.Logger Registry types.CapabilitiesRegistry MaxWorkerLimit int + QueueSize int } const ( defaultWorkerLimit = 100 + defaultQueueSize = 100000 ) func NewEngine(cfg Config) (engine *Engine, err error) { if cfg.MaxWorkerLimit == 0 { cfg.MaxWorkerLimit = defaultWorkerLimit } + + if cfg.QueueSize == 0 { + cfg.QueueSize = defaultQueueSize + } // TODO: validation of the workflow spec // We'll need to check, among other things: // - that there are no node `ref` called `trigger` as this is reserved for any triggers @@ -485,15 +488,15 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } engine = &Engine{ - logger: cfg.Lggr.Named("WorkflowEngine"), - registry: cfg.Registry, - workflow: workflow, - store: newStore(), - queue: newQueue[stepRequest](), - newWorkerCh: newWorkerCh, - stepUpdateCh: make(chan stepState), - callbackCh: make(chan capabilities.CapabilityResponse), - stopCh: make(chan struct{}), + logger: cfg.Lggr.Named("WorkflowEngine"), + registry: cfg.Registry, + workflow: workflow, + store: newInMemoryStore(), + queue: make(chan stepRequest, cfg.QueueSize), + newWorkerCh: newWorkerCh, + stepUpdateCh: make(chan stepState), + triggerEvents: make(chan capabilities.CapabilityResponse), + stopCh: make(chan struct{}), } return engine, nil } diff --git a/core/services/workflows/queue.go b/core/services/workflows/queue.go deleted file mode 100644 index 5ae9d4e4424..00000000000 --- a/core/services/workflows/queue.go +++ /dev/null @@ -1,53 +0,0 @@ -package workflows - -import ( - "context" - "sync" -) - -type stepRequest struct { - stepRef string - state executionState -} - -type queue[T any] struct { - in chan T - out chan T -} - -func (q *queue[T]) worker(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - qData := []T{} - - for { - select { - case <-ctx.Done(): - return - case inc := <-q.in: - qData = append(qData, inc) - default: - if len(qData) > 0 { - popped := qData[0] - select { - case q.out <- popped: - qData = qData[1:] - default: - } - } - } - - } -} - -func (q *queue[T]) start(ctx context.Context, wg *sync.WaitGroup) { - wg.Add(1) - go q.worker(ctx, wg) -} - -func newQueue[T any]() *queue[T] { - return &queue[T]{ - in: make(chan T), - out: make(chan T), - } -} diff --git a/core/services/workflows/queue_test.go b/core/services/workflows/queue_test.go deleted file mode 100644 index d115cd69a4e..00000000000 --- a/core/services/workflows/queue_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package workflows - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" -) - -func TestQueue(t *testing.T) { - ctx := testutils.Context(t) - q := newQueue[int]() - var wg sync.WaitGroup - q.start(ctx, &wg) - - ints := []int{1, 2, 3, 4, 5} - for _, i := range ints { - q.in <- i - } - - got := []int{} - for i := 0; i < 5; i++ { - got = append(got, <-q.out) - } - - assert.Equal(t, ints, got) - - assert.Len(t, q.out, 0) -} diff --git a/core/services/workflows/store.go b/core/services/workflows/store.go index bb9a8d14bcc..57fd7e051cc 100644 --- a/core/services/workflows/store.go +++ b/core/services/workflows/store.go @@ -6,16 +6,19 @@ import ( "sync" ) -type store struct { +// `inMemoryStore` is a temporary in-memory +// equivalent of the database table that should persist +// workflow progress. +type inMemoryStore struct { idToState map[string]*executionState mu sync.RWMutex } -func newStore() *store { - return &store{idToState: map[string]*executionState{}} +func newInMemoryStore() *inMemoryStore { + return &inMemoryStore{idToState: map[string]*executionState{}} } -func (s *store) add(ctx context.Context, state *executionState) error { +func (s *inMemoryStore) add(ctx context.Context, state *executionState) error { s.mu.Lock() defer s.mu.Unlock() _, ok := s.idToState[state.executionID] @@ -27,7 +30,7 @@ func (s *store) add(ctx context.Context, state *executionState) error { return nil } -func (s *store) updateStep(ctx context.Context, step *stepState) (executionState, error) { +func (s *inMemoryStore) updateStep(ctx context.Context, step *stepState) (executionState, error) { s.mu.Lock() defer s.mu.Unlock() state, ok := s.idToState[step.executionID] @@ -39,7 +42,7 @@ func (s *store) updateStep(ctx context.Context, step *stepState) (executionState return *state, nil } -func (s *store) updateStatus(ctx context.Context, executionID string, status string) error { +func (s *inMemoryStore) updateStatus(ctx context.Context, executionID string, status string) error { s.mu.Lock() defer s.mu.Unlock() state, ok := s.idToState[executionID] diff --git a/core/services/workflows/workflow.go b/core/services/workflows/workflow.go index 52392607c60..7a8423c07a7 100644 --- a/core/services/workflows/workflow.go +++ b/core/services/workflows/workflow.go @@ -11,7 +11,13 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" ) -type Capability struct { +type stepRequest struct { + executionID string + stepRef string + state executionState +} + +type capabilityDefinition struct { Type string `yaml:"type"` Ref string `yaml:"ref"` Inputs map[string]any `yaml:"inputs"` @@ -19,14 +25,14 @@ type Capability struct { } type workflowSpec struct { - Triggers []Capability `yaml:"triggers"` - Actions []Capability `yaml:"actions"` - Consensus []Capability `yaml:"consensus"` - Targets []Capability `yaml:"targets"` + Triggers []capabilityDefinition `yaml:"triggers"` + Actions []capabilityDefinition `yaml:"actions"` + Consensus []capabilityDefinition `yaml:"consensus"` + Targets []capabilityDefinition `yaml:"targets"` } -func (w *workflowSpec) steps() []Capability { - s := []Capability{} +func (w *workflowSpec) steps() []capabilityDefinition { + s := []capabilityDefinition{} s = append(s, w.Actions...) s = append(s, w.Consensus...) s = append(s, w.Targets...) @@ -90,15 +96,15 @@ func (w *workflow) adjacentNodes(start string) ([]*node, error) { } type node struct { - Capability - dependencies []string - cachedCapability capabilities.CallbackExecutable - cachedConfig *values.Map + capabilityDefinition + dependencies []string + capability capabilities.CallbackExecutable + config *values.Map } type triggerCapability struct { - Capability - cachedTrigger capabilities.TriggerCapability + capabilityDefinition + trigger capabilities.TriggerCapability } const ( @@ -106,8 +112,8 @@ const ( ) func Parse(yamlWorkflow string) (*workflow, error) { - wfs := &workflowSpec{} - err := yaml.Unmarshal([]byte(yamlWorkflow), wfs) + spec := &workflowSpec{} + err := yaml.Unmarshal([]byte(yamlWorkflow), spec) if err != nil { return nil, err } @@ -116,6 +122,9 @@ func Parse(yamlWorkflow string) (*workflow, error) { // empty graph with just one starting entry: `trigger`. // This provides the starting point for our graph and // points to all dependent nodes. + // Note: all triggers are represented by a single node called + // `trigger`. This is because for workflows with multiple triggers + // only one trigger will have started the workflow. nodeHash := func(n *node) string { return n.Ref } @@ -125,20 +134,25 @@ func Parse(yamlWorkflow string) (*workflow, error) { graph.Directed(), ) err = g.AddVertex(&node{ - Capability: Capability{Ref: keywordTrigger}, + capabilityDefinition: capabilityDefinition{Ref: keywordTrigger}, }) if err != nil { return nil, err } - for _, s := range wfs.steps() { + // Next, let's populate the other entries in the graph. + for _, s := range spec.steps() { + // TODO: The workflow format spec doesn't always require a `Ref` + // to be provided (triggers and targets don't have a `Ref` for example). + // To handle this, we default the `Ref` to the type, but ideally we + // should find a better long-term way to handle this. if s.Ref == "" { s.Ref = s.Type } - err := g.AddVertex(&node{Capability: s}) - if err != nil { - return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, err) + innerErr := g.AddVertex(&node{capabilityDefinition: s}) + if innerErr != nil { + return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr) } } @@ -146,10 +160,13 @@ func Parse(yamlWorkflow string) (*workflow, error) { if err != nil { return nil, err } + + // Next, let's iterate over the nodes and populate + // any edges. for nodeRef := range nodeRefs { - node, err := g.Vertex(nodeRef) - if err != nil { - return nil, err + node, innerErr := g.Vertex(nodeRef) + if innerErr != nil { + return nil, innerErr } refs, innerErr := findRefs(node.Inputs) @@ -159,21 +176,21 @@ func Parse(yamlWorkflow string) (*workflow, error) { node.dependencies = refs for _, r := range refs { - err = g.AddEdge(r, node.Ref) - if err != nil { - return nil, err + innerErr = g.AddEdge(r, node.Ref) + if innerErr != nil { + return nil, innerErr } } } triggerNodes := []*triggerCapability{} - for _, t := range wfs.Triggers { + for _, t := range spec.Triggers { triggerNodes = append(triggerNodes, &triggerCapability{ - Capability: t, + capabilityDefinition: t, }) } wf := &workflow{ - spec: wfs, + spec: spec, Graph: g, triggers: triggerNodes, } diff --git a/core/services/workflows/workflow_test.go b/core/services/workflows/workflow_test.go index f1ba8f0b64f..93b5bf64f56 100644 --- a/core/services/workflows/workflow_test.go +++ b/core/services/workflows/workflow_test.go @@ -148,6 +148,44 @@ targets: `, errMsg: "source vertex missing-action: vertex not found", }, + { + name: "two trigger nodes", + yaml: ` +triggers: + - type: "a-trigger" + - type: "a-second-trigger" + +actions: + - type: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + +consensus: + - type: "a-consensus" + ref: "a-consensus" + inputs: + an-action_output: $(an-action.outputs) + +targets: + - type: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + graph: map[string]map[string]struct{}{ + keywordTrigger: { + "an-action": struct{}{}, + }, + "an-action": { + "a-consensus": struct{}{}, + }, + "a-consensus": { + "a-target": struct{}{}, + }, + "a-target": {}, + }, + }, } for _, tc := range testCases {