Skip to content

Commit

Permalink
Replace graph with graph lib
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Mar 19, 2024
1 parent 6dafba1 commit 0cec920
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 196 deletions.
1 change: 1 addition & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ require (
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dominikbraun/graph v0.23.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/esote/minmaxheap v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
34 changes: 23 additions & 11 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ func (e *Engine) loop(ctx context.Context) {
case resp := <-e.callbackCh:
if resp.Err != nil {
e.logger.Errorf("trigger event was an error; not executing", resp.Err)
} else {
e.startExecution(ctx, resp.Value)
continue
}

err := e.startExecution(ctx, resp.Value)
if err != nil {
e.logger.Errorf("failed to start execution: %w", err)
}
case dm := <-e.queue.out:
<-e.newWorkerCh
Expand All @@ -210,7 +214,7 @@ func (e *Engine) loop(ctx context.Context) {
}
}

func (e *Engine) startExecution(ctx context.Context, event values.Value) {
func (e *Engine) startExecution(ctx context.Context, event values.Value) error {
executionID := uuid.New().String()
e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID)
ec := &executionState{
Expand All @@ -229,14 +233,21 @@ func (e *Engine) startExecution(ctx context.Context, event values.Value) {

err := e.store.add(ctx, ec)
if err != nil {
return
return err
}

// Find the tasks we need to fire when a trigger has fired and enqueue them.
for _, node := range e.workflow.adjacentNodes(keywordTrigger) {
an, err := e.workflow.adjacentNodes(keywordTrigger)
if err != nil {
return err
}

for _, node := range an {
e.logger.Debugw("step request enqueued", "ref", node.Ref, "executionID", executionID)
e.queue.in <- stepRequest{state: copyState(*ec), stepRef: node.Ref}
}

return nil
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) error {
Expand All @@ -247,7 +258,10 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate stepState) err

switch stepUpdate.status {
case statusCompleted:
adjacentNodes := e.workflow.adjacentNodes(stepUpdate.ref)
adjacentNodes, err := e.workflow.adjacentNodes(stepUpdate.ref)
if err != nil {
return err
}
// There are no nodes left to process in the current path, so let's check if
// we've completed the workflow.
// If not, we'll check adjacent nodes for any that are ready to process.
Expand Down Expand Up @@ -337,9 +351,9 @@ func (e *Engine) workerForStep(ctx context.Context, msg stepRequest) {
}

func (e *Engine) handleStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) {
node, ok := e.workflow.nodes[msg.stepRef]
if !ok {
return nil, nil, fmt.Errorf("could not get step for %s", msg.stepRef)
node, err := e.workflow.Vertex(msg.stepRef)
if err != nil {
return nil, nil, err
}

i, err := findAndInterpolateAllKeys(node.Inputs, msg.state)
Expand Down Expand Up @@ -470,7 +484,6 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
newWorkerCh <- struct{}{}
}

var wg sync.WaitGroup
engine = &Engine{
logger: cfg.Lggr.Named("WorkflowEngine"),
registry: cfg.Registry,
Expand All @@ -480,7 +493,6 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
newWorkerCh: newWorkerCh,
stepUpdateCh: make(chan stepState),
callbackCh: make(chan capabilities.CapabilityResponse),
wg: wg,
stopCh: make(chan struct{}),
}
return engine, nil
Expand Down
62 changes: 0 additions & 62 deletions core/services/workflows/graph.go

This file was deleted.

49 changes: 0 additions & 49 deletions core/services/workflows/graph_test.go

This file was deleted.

125 changes: 82 additions & 43 deletions core/services/workflows/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"gopkg.in/yaml.v3"

"github.com/dominikbraun/graph"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)
Expand Down Expand Up @@ -32,13 +34,61 @@ func (w *workflowSpec) steps() []Capability {
}

type workflow struct {
*graph[*node]
graph.Graph[string, *node]

triggers []*triggerCapability

spec *workflowSpec
}

func (w *workflow) walkDo(start string, do func(n *node) error) error {
var outerErr error
err := graph.BFS(w.Graph, start, func(ref string) bool {
n, err := w.Graph.Vertex(ref)
if err != nil {
outerErr = err
return true
}

err = do(n)
if err != nil {
outerErr = err
return true
}

return false
})
if err != nil {
return err
}

return outerErr
}

func (w *workflow) adjacentNodes(start string) ([]*node, error) {
nodes := []*node{}
m, err := w.Graph.AdjacencyMap()
if err != nil {
return nil, err
}

adj, ok := m[start]
if !ok {
return nil, fmt.Errorf("could not find node with ref %s", start)
}

for adjacentRef := range adj {
n, err := w.Graph.Vertex(adjacentRef)
if err != nil {
return nil, err
}

nodes = append(nodes, n)
}

return nodes, nil
}

type node struct {
Capability
dependencies []string
Expand Down Expand Up @@ -66,64 +116,53 @@ func Parse(yamlWorkflow string) (*workflow, error) {
// empty graph with just one starting entry: `trigger`.
// This provides the starting point for our graph and
// points to all dependent nodes.
nodes := map[string]*node{
keywordTrigger: {Capability: Capability{Ref: keywordTrigger}},
nodeHash := func(n *node) string {
return n.Ref
}
adjacencies := map[string]map[string]struct{}{
keywordTrigger: {},
}
graph := &graph[*node]{
adjacencies: adjacencies,
nodes: nodes,
g := graph.New(
nodeHash,
graph.PreventCycles(),
graph.Directed(),
)
err = g.AddVertex(&node{
Capability: Capability{Ref: keywordTrigger},
})
if err != nil {
return nil, err
}

for _, s := range wfs.steps() {
// For steps that don't have a ref, use
// the node's type as a default.
if s.Ref == "" {
s.Ref = s.Type
}

_, ok := nodes[s.Ref]
if ok {
return nil, fmt.Errorf("duplicate reference %s found in workflow spec", s.Ref)
err := g.AddVertex(&node{Capability: s})

Check failure on line 139 in core/services/workflows/workflow.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 110 (govet)
if err != nil {
return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, err)
}
}

nodes[s.Ref] = &node{Capability: s}
adjacencies[s.Ref] = map[string]struct{}{}
nodeRefs, err := g.AdjacencyMap()
if err != nil {
return nil, err
}
for nodeRef := range nodeRefs {
node, err := g.Vertex(nodeRef)

Check failure on line 150 in core/services/workflows/workflow.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 110 (govet)
if err != nil {
return nil, err
}

for _, nd := range nodes {
refs, innerErr := findRefs(nd.Inputs)
refs, innerErr := findRefs(node.Inputs)
if innerErr != nil {
return nil, innerErr
}
nd.dependencies = refs
node.dependencies = refs

for _, r := range refs {
_, ok := nodes[r]
if !ok && r != keywordTrigger {
return nil, fmt.Errorf("invalid reference %s found in workflow spec", r)
err = g.AddEdge(r, node.Ref)
if err != nil {
return nil, err
}

adjacencies[r][nd.Ref] = struct{}{}

var found bool
innerErr := graph.walkDo(nd.Ref, func(n *node) error {
if n.Ref == r {
found = true
return nil
}

return nil
})
if innerErr != nil {
return nil, innerErr
}

if found {
return nil, fmt.Errorf("found circular relationship between %s and %s", r, nd.Ref)
}

}
}

Expand All @@ -135,7 +174,7 @@ func Parse(yamlWorkflow string) (*workflow, error) {
}
wf := &workflow{
spec: wfs,
graph: graph,
Graph: g,
triggers: triggerNodes,
}
return wf, err
Expand Down
Loading

0 comments on commit 0cec920

Please sign in to comment.