Skip to content

Commit

Permalink
feat: improve logs on cyclic deps (#562)
Browse files Browse the repository at this point in the history
* feat: improve logs on cyclic deps

* refactor: stringify cyclic paths

* refactor: use treeprint

* fix: lint

* refactor: combine cyclic validation into one function

* refactor: rename to nodeName
  • Loading branch information
deryrahman authored and arinda-arif committed Nov 16, 2022
1 parent aa16c89 commit 3da84f9
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 48 deletions.
104 changes: 71 additions & 33 deletions core/tree/multi_root_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package tree
import (
"errors"
"fmt"
"sort"

"github.com/xlab/treeprint"
)

// ErrCyclicDependencyEncountered is triggered a tree has a cyclic dependency
Expand Down Expand Up @@ -46,51 +49,86 @@ func (t *MultiRootTree) GetNodeByName(dagName string) (*TreeNode, bool) {
return value, ok
}

// IsCyclic - detects if there are any cycles in the tree
func (t *MultiRootTree) IsCyclic() error {
visitedMap := make(map[string]bool)
for _, node := range t.dataMap {
if _, visited := visitedMap[node.GetName()]; !visited {
pathMap := make(map[string]bool)
err := t.hasCycle(node, visitedMap, pathMap)
if err != nil {
return err
}
}
// get sorted nodeNames from dataMap
func (t *MultiRootTree) getSortedNodeNames() []string {
nodeNames := []string{}
for nodeName := range t.dataMap {
nodeNames = append(nodeNames, nodeName)
}
return nil
sort.Strings(nodeNames)
return nodeNames
}

// runs a DFS on a given tree using visitor pattern
func (t *MultiRootTree) hasCycle(root *TreeNode, visited, pathMap map[string]bool) error {
_, isNodeVisited := visited[root.GetName()]
if !isNodeVisited || !visited[root.GetName()] {
pathMap[root.GetName()] = true
visited[root.GetName()] = true
var cyclicErr error
for _, child := range root.Dependents {
n, _ := t.GetNodeByName(child.GetName())
_, isChildVisited := visited[child.GetName()]
if !isChildVisited || !visited[child.GetName()] {
cyclicErr = t.hasCycle(n, visited, pathMap)
// ValidateCyclic - detects if there are any cycles in the tree
func (t *MultiRootTree) ValidateCyclic() error {
// runs a DFS on a given tree using visitor pattern
var checkCyclic func(*TreeNode, map[string]bool, map[string]bool, *[]string) error
checkCyclic = func(node *TreeNode, visitedNodeNames, visitedPaths map[string]bool, orderedVisitedPaths *[]string) error {
_, isNodeVisited := visitedNodeNames[node.GetName()]
if !isNodeVisited || !visitedNodeNames[node.GetName()] {
visitedPaths[node.GetName()] = true
visitedNodeNames[node.GetName()] = true
*orderedVisitedPaths = append(*orderedVisitedPaths, node.GetName())
var cyclicErr error
for _, child := range node.Dependents {
n, _ := t.GetNodeByName(child.GetName())
_, isChildVisited := visitedNodeNames[child.GetName()]
if !isChildVisited || !visitedNodeNames[child.GetName()] {
cyclicErr = checkCyclic(n, visitedNodeNames, visitedPaths, orderedVisitedPaths)
}
if cyclicErr != nil {
return cyclicErr
}

if isVisited, ok := visitedPaths[child.GetName()]; ok && isVisited {
*orderedVisitedPaths = append(*orderedVisitedPaths, child.GetName())
cyclicErr = fmt.Errorf("%w: %s", ErrCyclicDependencyEncountered, prettifyPaths(*orderedVisitedPaths))
}
if cyclicErr != nil {
return cyclicErr
}
}
if cyclicErr != nil {
return cyclicErr
visitedPaths[node.GetName()] = false
i := 0
for i < len(*orderedVisitedPaths) && (*orderedVisitedPaths)[i] != node.GetName() {
i++
}
*orderedVisitedPaths = append((*orderedVisitedPaths)[:i], (*orderedVisitedPaths)[i+1:]...)
}
return nil
}

_, childAlreadyInPath := pathMap[child.GetName()] // 1 -> 2 -> 1
if childAlreadyInPath && pathMap[child.GetName()] {
cyclicErr = fmt.Errorf("%w: %s", ErrCyclicDependencyEncountered, root.GetName())
}
if cyclicErr != nil {
return cyclicErr
visitedNodeNames := make(map[string]bool)
nodeNames := t.getSortedNodeNames()
for _, nodeName := range nodeNames {
node := t.dataMap[nodeName]
if _, visited := visitedNodeNames[node.GetName()]; !visited {
visitedPaths := map[string]bool{}
orderedVisitedPaths := []string{}
if err := checkCyclic(node, visitedNodeNames, visitedPaths, &orderedVisitedPaths); err != nil {
return err
}
}
pathMap[root.GetName()] = false
}

return nil
}

func prettifyPaths(paths []string) string {
if len(paths) == 0 {
return ""
}
i := len(paths) - 1
root := treeprint.NewWithRoot(paths[i])
tree := root

for i--; i >= 0; i-- {
tree = tree.AddBranch(paths[i])
}

return "\n" + root.String()
}

// NewMultiRootTree returns an instance of multi root dag tree
func NewMultiRootTree() *MultiRootTree {
return &MultiRootTree{
Expand Down
31 changes: 23 additions & 8 deletions core/tree/multi_root_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestMultiRootDagTree(t *testing.T) {
multiRootTree.AddNodeIfNotExist(treeNode1)
multiRootTree.AddNodeIfNotExist(treeNode2)

err := multiRootTree.IsCyclic()
err := multiRootTree.ValidateCyclic()
assert.NotNil(t, err)
assert.Contains(t, err.Error(), tree.ErrCyclicDependencyEncountered.Error())
})
Expand All @@ -38,22 +38,37 @@ func TestMultiRootDagTree(t *testing.T) {
assert.Equal(t, 1, len(rootNodes))
assert.Equal(t, "job1", rootNodes[0].Data.GetName())
})
t.Run("IsCyclic", func(t *testing.T) {
t.Run("ValidateCyclic", func(t *testing.T) {
t.Run("should throw an error if cyclic", func(t *testing.T) {
treeNode1 := tree.NewTreeNode(models.JobSpec{
Name: "job1",
Name: "pilotdata-integration.playground.job1",
})
treeNode2 := tree.NewTreeNode(models.JobSpec{
Name: "job2",
Name: "pilotdata-integration.playground.job2",
})
treeNode3 := tree.NewTreeNode(models.JobSpec{
Name: "pilotdata-integration.playground.job3",
})
treeNode4 := tree.NewTreeNode(models.JobSpec{
Name: "pilotdata-integration.playground.job4",
})
multiRootTree := tree.NewMultiRootTree()
multiRootTree.AddNode(treeNode1)
multiRootTree.AddNode(treeNode2)
treeNode1.AddDependent(treeNode2)
multiRootTree.AddNode(treeNode3)
multiRootTree.AddNode(treeNode4)
treeNode4.AddDependent(treeNode3)
treeNode3.AddDependent(treeNode2)
treeNode2.AddDependent(treeNode1)
err := multiRootTree.IsCyclic()
treeNode2.AddDependent(treeNode4)
err := multiRootTree.ValidateCyclic()
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "cycle dependency")
assert.Equal(t, `a cycle dependency encountered in the tree:
pilotdata-integration.playground.job2
└── pilotdata-integration.playground.job3
└── pilotdata-integration.playground.job4
└── pilotdata-integration.playground.job2
`, err.Error())
})
t.Run("should not return error if not cyclic", func(t *testing.T) {
treeNode1 := tree.NewTreeNode(models.JobSpec{
Expand All @@ -66,7 +81,7 @@ func TestMultiRootDagTree(t *testing.T) {
multiRootTree.AddNode(treeNode1)
multiRootTree.AddNode(treeNode2)
treeNode1.AddDependent(treeNode2)
err := multiRootTree.IsCyclic()
err := multiRootTree.ValidateCyclic()
assert.Nil(t, err)
})
})
Expand Down
2 changes: 1 addition & 1 deletion job/priority_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (a *priorityResolver) buildMultiRootDependencyTree(jobSpecs []models.JobSpe
}
}

if err := multiRootTree.IsCyclic(); err != nil {
if err := multiRootTree.ValidateCyclic(); err != nil {
return nil, err
}
return multiRootTree, nil
Expand Down
10 changes: 5 additions & 5 deletions job/priority_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func TestMultiRootDAGTree(t *testing.T) {
dagTree.AddNode(node2)
dagTree.AddNode(node3)

err := dagTree.IsCyclic()
err := dagTree.ValidateCyclic()
assert.NotNil(t, err)
assert.Contains(t, err.Error(), tree.ErrCyclicDependencyEncountered.Error())
})
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestMultiRootDAGTree(t *testing.T) {
dagTree.AddNode(node1211)
dagTree.AddNode(node1212)

err := dagTree.IsCyclic()
err := dagTree.ValidateCyclic()
assert.Nil(t, err)

depsMap := map[*tree.TreeNode]int{
Expand All @@ -614,7 +614,7 @@ func TestMultiRootDAGTree(t *testing.T) {
dagTree.AddNode(node2)
dagTree.MarkRoot(node2)

err := dagTree.IsCyclic()
err := dagTree.ValidateCyclic()
assert.Nil(t, err)
})

Expand All @@ -625,7 +625,7 @@ func TestMultiRootDAGTree(t *testing.T) {
dagTree := tree.NewMultiRootTree()
dagTree.AddNode(node2)

err := dagTree.IsCyclic()
err := dagTree.ValidateCyclic()
assert.Nil(t, err)
})

Expand Down Expand Up @@ -658,7 +658,7 @@ func TestMultiRootDAGTree(t *testing.T) {
dagTree.AddNode(node31)
dagTree.AddNode(node41)

err := dagTree.IsCyclic()
err := dagTree.ValidateCyclic()
assert.NotNil(t, err)
assert.Contains(t, err.Error(), tree.ErrCyclicDependencyEncountered.Error())
})
Expand Down
2 changes: 1 addition & 1 deletion job/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func populateDownstreamDAGs(dagTree *tree.MultiRootTree, jobSpec models.JobSpec,
}
}

if err := dagTree.IsCyclic(); err != nil {
if err := dagTree.ValidateCyclic(); err != nil {
return nil, err
}

Expand Down

0 comments on commit 3da84f9

Please sign in to comment.