Skip to content

Commit

Permalink
make the DM a field in CLab with a WithDependencymanager Option
Browse files Browse the repository at this point in the history
  • Loading branch information
steiler committed Dec 1, 2023
1 parent a7df071 commit 44085d4
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 48 deletions.
80 changes: 43 additions & 37 deletions clab/clab.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ import (
)

type CLab struct {
Config *Config `json:"config,omitempty"`
TopoPaths *types.TopoPaths
Nodes map[string]nodes.Node `json:"nodes,omitempty"`
Links map[int]links.Link `json:"links,omitempty"`
Endpoints []links.Endpoint
Runtimes map[string]runtime.ContainerRuntime `json:"runtimes,omitempty"`
Config *Config `json:"config,omitempty"`
TopoPaths *types.TopoPaths
Nodes map[string]nodes.Node `json:"nodes,omitempty"`
Links map[int]links.Link `json:"links,omitempty"`
Endpoints []links.Endpoint
Runtimes map[string]runtime.ContainerRuntime `json:"runtimes,omitempty"`
dependencyManager dependency_manager.DependencyManager
// reg is a registry of node kinds
Reg *nodes.NodeRegistry
Cert *cert.Cert
Expand Down Expand Up @@ -66,6 +67,14 @@ func WithTimeout(dur time.Duration) ClabOption {
}
}

// WithDependencyManager injects the DependencyManager
func WithDependencyManager(dm dependency_manager.DependencyManager) ClabOption {
return func(c *CLab) error {
c.dependencyManager = dm
return nil
}
}

// WithDebug sets debug mode.
func WithDebug(debug bool) ClabOption {
return func(c *CLab) error {
Expand Down Expand Up @@ -295,9 +304,6 @@ func NewContainerLab(opts ...ClabOption) (*CLab, error) {
// init a new NodeRegistry
c.Reg = nodes.NewNodeRegistry()

// register all nodes
c.RegisterNodes()

for _, opt := range opts {
err := opt(c)
if err != nil {
Expand Down Expand Up @@ -349,57 +355,57 @@ func (c *CLab) GlobalRuntime() runtime.ContainerRuntime {

// CreateNodes schedules nodes creation and returns a waitgroup for all nodes.
// Nodes interdependencies are created in this function.
func (c *CLab) CreateNodes(ctx context.Context, maxWorkers uint,
dm dependency_manager.DependencyManager, skipPostDeploy bool) (*sync.WaitGroup, error) {
func (c *CLab) CreateNodes(ctx context.Context, maxWorkers uint, skipPostDeploy bool) (*sync.WaitGroup, error) {

for nodeName := range c.Nodes {
dm.AddNode(nodeName)
c.dependencyManager.AddNode(nodeName)
}

// nodes with static mgmt IP should be scheduled before the dynamic ones
err := createStaticDynamicDependency(c.Nodes, dm)
err := c.createStaticDynamicDependency()
if err != nil {
return nil, err
}

// create user-defined node dependencies done with `wait-for` node property
err = createWaitForDependency(c.Nodes, dm)
err = c.createWaitForDependency()
if err != nil {
return nil, err
}

// create a set of dependencies, that makes the ignite nodes start one after the other
err = createIgniteSerialDependency(c.Nodes, dm)
err = c.createIgniteSerialDependency()
if err != nil {
return nil, err
}

// make network namespace shared containers start in the right order
createNamespaceSharingDependency(c.Nodes, dm)
c.createNamespaceSharingDependency()

// Add possible additional dependencies here

// make sure that there are no unresolvable dependencies, which would deadlock.
err = dm.CheckAcyclicity()
err = c.dependencyManager.CheckAcyclicity()
if err != nil {
return nil, err
}

// start scheduling
NodesWg := c.scheduleNodes(ctx, int(maxWorkers), c.Nodes, dm, skipPostDeploy)
NodesWg := c.scheduleNodes(ctx, int(maxWorkers), skipPostDeploy)

return NodesWg, nil
}

// create a set of dependencies, that makes the ignite nodes start one after the other.
func createIgniteSerialDependency(nodeMap map[string]nodes.Node, dm dependency_manager.DependencyManager) error {
func (c *CLab) createIgniteSerialDependency() error {
var prevIgniteNode nodes.Node
// iterate through the nodes
for _, n := range nodeMap {
for _, n := range c.Nodes {
// find nodes that should run with IgniteRuntime
if n.GetRuntime().GetName() == ignite.RuntimeName {
if prevIgniteNode != nil {
// add a dependency to the previously found ignite node
dm.AddDependency(prevIgniteNode.Config().ShortName, types.WaitForCreate, n.Config().ShortName, types.WaitForCreate)
c.dependencyManager.AddDependency(prevIgniteNode.Config().ShortName, types.WaitForCreate, n.Config().ShortName, types.WaitForCreate)
}
prevIgniteNode = n
}
Expand All @@ -415,8 +421,8 @@ func createIgniteSerialDependency(nodeMap map[string]nodes.Node, dm dependency_m
// network-mode: container:node_b
//
// then node_a depends on node_b, and waits for node_b to be scheduled first.
func createNamespaceSharingDependency(nodeMap map[string]nodes.Node, dm dependency_manager.DependencyManager) {
for nodeName, n := range nodeMap {
func (c *CLab) createNamespaceSharingDependency() {
for nodeName, n := range c.Nodes {
nodeConfig := n.Config()
netModeArr := strings.SplitN(nodeConfig.NetworkMode, ":", 2)
if netModeArr[0] != "container" {
Expand All @@ -428,23 +434,24 @@ func createNamespaceSharingDependency(nodeMap map[string]nodes.Node, dm dependen

// if the container does not exist in the list of containers, it must be an external dependency
// it can be ignored for internal processing so -> continue
if _, exists := nodeMap[netModeArr[1]]; !exists {
if _, exists := c.Nodes[netModeArr[1]]; !exists {
continue
}

// since the referenced container is clab-managed node, we create a dependency between the nodes
dm.AddDependency(nodeName, types.WaitForCreate, referencedNode, types.WaitForCreate)
c.dependencyManager.AddDependency(nodeName, types.WaitForCreate, referencedNode, types.WaitForCreate)
}
}

// createStaticDynamicDependency creates the dependencies between the nodes such that all nodes with dynamic mgmt IP
// are dependent on the nodes with static mgmt IP. This results in nodes with static mgmt IP to be scheduled before dynamic ones.
func createStaticDynamicDependency(n map[string]nodes.Node, dm dependency_manager.DependencyManager) error {
func (c *CLab) createStaticDynamicDependency() error {

staticIPNodes := make(map[string]nodes.Node)
dynIPNodes := make(map[string]nodes.Node)

// divide the nodes into static and dynamic mgmt IP nodes.
for name, n := range n {
for name, n := range c.Nodes {
if n.Config().MgmtIPv4Address != "" || n.Config().MgmtIPv6Address != "" {
staticIPNodes[name] = n
continue
Expand All @@ -456,7 +463,7 @@ func createStaticDynamicDependency(n map[string]nodes.Node, dm dependency_manage
for dynNodeName := range dynIPNodes {
// and add their wait group to the the static nodes, while increasing the waitgroup
for staticNodeName := range staticIPNodes {
err := dm.AddDependency(dynNodeName, types.WaitForCreate, staticNodeName, types.WaitForCreate)
err := c.dependencyManager.AddDependency(dynNodeName, types.WaitForCreate, staticNodeName, types.WaitForCreate)
if err != nil {
return err
}
Expand All @@ -466,12 +473,12 @@ func createStaticDynamicDependency(n map[string]nodes.Node, dm dependency_manage
}

// createWaitForDependency reflects the dependencies defined in the configuration via the wait-for field.
func createWaitForDependency(n map[string]nodes.Node, dm dependency_manager.DependencyManager) error {
for waiterNode, node := range n {
func (c *CLab) createWaitForDependency() error {
for waiterNode, node := range c.Nodes {
// add node's waitFor nodes to the dependency manager
for phase, waitForNodes := range node.Config().WaitFor {
for _, waitForNode := range waitForNodes {
err := dm.AddDependency(waiterNode, phase, waitForNode.Node, waitForNode.State)
err := c.dependencyManager.AddDependency(waiterNode, phase, waitForNode.Node, waitForNode.State)
if err != nil {
return err
}
Expand All @@ -482,8 +489,7 @@ func createWaitForDependency(n map[string]nodes.Node, dm dependency_manager.Depe
return nil
}

func (c *CLab) scheduleNodes(ctx context.Context, maxWorkers int,
scheduledNodes map[string]nodes.Node, dm dependency_manager.DependencyManager, skipPostDeploy bool) *sync.WaitGroup {
func (c *CLab) scheduleNodes(ctx context.Context, maxWorkers int, skipPostDeploy bool) *sync.WaitGroup {
concurrentChan := make(chan nodes.Node)

workerFunc := func(i int, input chan nodes.Node, wg *sync.WaitGroup,
Expand Down Expand Up @@ -629,7 +635,7 @@ func (c *CLab) scheduleNodes(ctx context.Context, maxWorkers int,
}
}

numScheduledNodes := len(scheduledNodes)
numScheduledNodes := len(c.Nodes)
if numScheduledNodes < maxWorkers {
maxWorkers = numScheduledNodes
}
Expand All @@ -640,15 +646,15 @@ func (c *CLab) scheduleNodes(ctx context.Context, maxWorkers int,
// it's safe to not check if all nodes are serial because in that case
// maxWorkers will be 0
for i := 0; i < maxWorkers; i++ {
go workerFunc(i, concurrentChan, wg, dm)
go workerFunc(i, concurrentChan, wg, c.dependencyManager)
}

// Waitgroup used to protect the channel towards the workers of being closed to early
workerFuncChWG := new(sync.WaitGroup)

// schedule nodes via a go func to create links in parallel
go func() {
for _, n := range scheduledNodes {
for _, n := range c.Nodes {
workerFuncChWG.Add(1)
// start a func for all the containers, then will wait for their own waitgroups
// to be set to zero by their depending containers, then enqueue to the creation channel
Expand All @@ -666,7 +672,7 @@ func (c *CLab) scheduleNodes(ctx context.Context, maxWorkers int,
workerChan <- node
// indicate we are done, such that only when all of these functions are done, the workerChan is being closed
wfcwg.Done()
}(n, dm, concurrentChan, workerFuncChWG) // execute this function straight away
}(n, c.dependencyManager, concurrentChan, workerFuncChWG) // execute this function straight away
}

// Gate to make sure the channel is not closed before all the nodes made it though the channel
Expand Down
33 changes: 30 additions & 3 deletions clab/clab_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,17 @@ func Test_createNamespaceSharingDependencyOne(t *testing.T) {
// retrieve a map of nodes
nodeMap := getNodeMap(mockCtrl)

clab := &CLab{
Nodes: nodeMap,
}

err := WithDependencyManager(dm)(clab)
if err != nil {
t.Error(err)
}

dm.EXPECT().AddDependency("node3", types.WaitForCreate, "node2", types.WaitForCreate)
createNamespaceSharingDependency(nodeMap, dm)
clab.createNamespaceSharingDependency()
}

func Test_createStaticDynamicDependency(t *testing.T) {
Expand All @@ -46,14 +55,23 @@ func Test_createStaticDynamicDependency(t *testing.T) {
// retrieve a map of nodes
nodeMap := getNodeMap(mockCtrl)

clab := &CLab{
Nodes: nodeMap,
}

err := WithDependencyManager(dm)(clab)
if err != nil {
t.Error(err)
}

dm.EXPECT().AddDependency("node1", types.WaitForCreate, "node4", types.WaitForCreate)
dm.EXPECT().AddDependency("node2", types.WaitForCreate, "node4", types.WaitForCreate)
dm.EXPECT().AddDependency("node3", types.WaitForCreate, "node4", types.WaitForCreate)
dm.EXPECT().AddDependency("node1", types.WaitForCreate, "node5", types.WaitForCreate)
dm.EXPECT().AddDependency("node2", types.WaitForCreate, "node5", types.WaitForCreate)
dm.EXPECT().AddDependency("node3", types.WaitForCreate, "node5", types.WaitForCreate)

createStaticDynamicDependency(nodeMap, dm)
clab.createStaticDynamicDependency()
}

// getNodeMap return a map of nodes for testing purpose.
Expand Down Expand Up @@ -162,13 +180,22 @@ func Test_createWaitForDependency(t *testing.T) {
// retrieve a map of nodes
nodeMap := getNodeMap(mockCtrl)

clab := &CLab{
Nodes: nodeMap,
}

err := WithDependencyManager(dm)(clab)
if err != nil {
t.Error(err)
}

dm.EXPECT().AddDependency("node2", types.WaitForCreate, "node1", types.WaitForCreate)
dm.EXPECT().AddDependency("node3", types.WaitForCreate, "node1", types.WaitForCreate)
dm.EXPECT().AddDependency("node3", types.WaitForCreate, "node2", types.WaitForCreate)
dm.EXPECT().AddDependency("node5", types.WaitForCreate, "node3", types.WaitForCreate)
dm.EXPECT().AddDependency("node5", types.WaitForCreate, "node4", types.WaitForCreate)

err := createWaitForDependency(nodeMap, dm)
err = clab.createWaitForDependency()
if err != nil {
t.Error(err)
}
Expand Down
15 changes: 7 additions & 8 deletions cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func deployFn(_ *cobra.Command, _ []string) error {
GracefulShutdown: graceful,
},
),
clab.WithDependencyManager(dependency_manager.NewDependencyManager()),
clab.WithDebug(debug),
}

Expand Down Expand Up @@ -216,9 +217,7 @@ func deployFn(_ *cobra.Command, _ []string) error {
n.Config().ExtraHosts = extraHosts
}

dm := dependency_manager.NewDependencyManager()

nodesWg, err := c.CreateNodes(ctx, nodeWorkers, dm, skipPostDeploy)
nodesWg, err := c.CreateNodes(ctx, nodeWorkers, skipPostDeploy)
if err != nil {
return err
}
Expand All @@ -235,18 +234,18 @@ func deployFn(_ *cobra.Command, _ []string) error {
return err
}

containers, err := c.ListNodesContainers(ctx)
if err != nil {
return err
}

// generate graph of the lab topology
if graph {
if err = c.GenerateDotGraph(); err != nil {
log.Error(err)
}
}

containers, err := c.ListNodesContainers(ctx)
if err != nil {
return err
}

log.Info("Adding containerlab host entries to /etc/hosts file")
err = clab.AppendHostsFileEntries(containers, c.Config.Name)
if err != nil {
Expand Down

0 comments on commit 44085d4

Please sign in to comment.