Skip to content

Commit

Permalink
Remove node drain call from config daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
e0ne committed Sep 11, 2023
1 parent 0582be9 commit f7645ef
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 85 deletions.
91 changes: 6 additions & 85 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package daemon
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"math/rand"
Expand All @@ -25,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -592,7 +590,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
glog.Info("nodeStateSyncHandler(): disable drain is true skipping drain")
} else {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient); err != nil {
return err
}
}
Expand Down Expand Up @@ -641,7 +639,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}
} else {
if !utils.NodeHasAnnotation(*dn.node, consts.NodeDrainAnnotation, consts.DrainIdle) {
if err := dn.annotateNode(dn.name, consts.DrainIdle); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil {
glog.Errorf("nodeStateSyncHandler(): failed to annotate node: %v", err)
return err
}
Expand Down Expand Up @@ -692,7 +690,7 @@ func (dn *Daemon) completeDrain() error {
}
}

if err := dn.annotateNode(dn.name, consts.DrainIdle); err != nil {
if err := utils.AnnotateNode(dn.name, consts.DrainIdle, dn.kubeClient); err != nil {
glog.Errorf("completeDrain(): failed to annotate node: %v", err)
return err
}
Expand Down Expand Up @@ -778,46 +776,6 @@ func rebootNode() {
}
}

func (dn *Daemon) annotateNode(node, value string) error {
glog.Infof("annotateNode(): Annotate node %s with: %s", node, value)

oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(context.Background(), dn.name, metav1.GetOptions{})
if err != nil {
glog.Infof("annotateNode(): Failed to get node %s %v, retrying", node, err)
return err
}
oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}
if newNode.Annotations[consts.NodeDrainAnnotation] != value {
newNode.Annotations[consts.NodeDrainAnnotation] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = dn.kubeClient.CoreV1().Nodes().Patch(context.Background(),
dn.name,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
glog.Infof("annotateNode(): Failed to patch node %s: %v", node, err)
return err
}
}
return nil
}

func (dn *Daemon) getNodeMachinePool() error {
desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
if !ok {
Expand All @@ -841,7 +799,7 @@ func (dn *Daemon) getNodeMachinePool() error {

func (dn *Daemon) applyDrainRequired() error {
glog.V(2).Info("applyDrainRequired(): no other node is draining")
err := dn.annotateNode(dn.name, consts.DrainRequired)
err := utils.AnnotateNode(dn.name, consts.DrainRequired, dn.kubeClient)
if err != nil {
glog.Errorf("applyDrainRequired(): Failed to annotate node: %v", err)
return err
Expand Down Expand Up @@ -893,7 +851,7 @@ func (dn *Daemon) pauseMCP() error {
glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, consts.DrainMcpPaused)
err = utils.AnnotateNode(dn.name, consts.DrainMcpPaused, dn.kubeClient)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
Expand All @@ -909,7 +867,7 @@ func (dn *Daemon) pauseMCP() error {
glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, consts.Draining)
err = utils.AnnotateNode(dn.name, consts.Draining, dn.kubeClient)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
Expand Down Expand Up @@ -937,43 +895,6 @@ func (dn *Daemon) pauseMCP() error {
return err
}

func (dn *Daemon) drainNode() error {
glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Factor: 2,
}
var lastErr error

glog.Info("drainNode(): Start draining")
if err = wait.ExponentialBackoff(backoff, func() (bool, error) {
err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true)
if err != nil {
lastErr = err
glog.Infof("Cordon failed with: %v, retrying", err)
return false, nil
}
err = drain.RunNodeDrain(dn.drainer, dn.name)
if err == nil {
return true, nil
}
lastErr = err
glog.Infof("Draining failed with: %v, retrying", err)
return false, nil
}); err != nil {
if err == wait.ErrWaitTimeout {
glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr)
}
glog.Errorf("drainNode(): failed to drain node: %v", err)
return err
}
glog.Info("drainNode(): drain complete")
return nil
}

func tryCreateSwitchdevUdevRule(nodeState *sriovnetworkv1.SriovNetworkNodeState) error {
glog.V(2).Infof("tryCreateSwitchdevUdevRule()")
var newContent string
Expand Down
42 changes: 42 additions & 0 deletions pkg/utils/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"context"
"encoding/json"
"fmt"
"os"

Expand Down Expand Up @@ -124,3 +125,44 @@ func NodeHasAnnotation(node corev1.Node, annoKey string, value string) bool {
}
return false
}

func AnnotateNode(node, value string, kubeClient kubernetes.Interface) error {
glog.Infof("annotateNode(): Annotate node %s with: %s", node, value)
oldNode, err := kubeClient.CoreV1().Nodes().Get(context.Background(), node, metav1.GetOptions{})
if err != nil {
glog.Infof("annotateNode(): Failed to get node %s %v, retrying", node, err)
return err
}

oldData, err := json.Marshal(oldNode)
if err != nil {
return err
}

newNode := oldNode.DeepCopy()
if newNode.Annotations == nil {
newNode.Annotations = map[string]string{}
}

if newNode.Annotations[consts.NodeDrainAnnotation] != value {
newNode.Annotations[consts.NodeDrainAnnotation] = value
newData, err := json.Marshal(newNode)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
if err != nil {
return err
}
_, err = kubeClient.CoreV1().Nodes().Patch(context.Background(),
node,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
glog.Infof("annotateNode(): Failed to patch node %s: %v", node, err)
return err
}
}
return nil
}

0 comments on commit f7645ef

Please sign in to comment.