Skip to content

Commit

Permalink
add Refresh method in nodegrp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabh-11 committed Dec 20, 2024
1 parent 98f20d3 commit 56d80ac
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 65 deletions.
56 changes: 56 additions & 0 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ package mcm
import (
"context"
"fmt"
"github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -321,6 +325,8 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -344,6 +350,8 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
if delta >= 0 {
return fmt.Errorf("size decrease size must be negative")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
if err != nil {
return err
Expand All @@ -358,6 +366,54 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error
}, "MachineDeployment", "update", machinedeployment.Name)
}

// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment
func (machineDeployment *MachineDeployment) Refresh() error {
machineDeployment.scalingMutex.Lock()
defer machineDeployment.scalingMutex.Unlock()
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name)
if err != nil {
return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err)
}
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(mcd) {
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name)
return nil
}
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)
machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
return err
}
var incorrectlyMarkedMachines []*Ref
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace})
}
}
var updatedMarkedMachines []string
for machineName := range markedMachines {
if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool {
return mc.Name == machineName
}) {
updatedMarkedMachines = append(updatedMarkedMachines, machineName)
}
}
clone := mcd.DeepCopy()
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",")
ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout)
defer cancelFn()
_, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines)
}

// Belongs returns true if the given node belongs to the NodeGroup.
// TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list.
func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) {
Expand Down
89 changes: 24 additions & 65 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,45 +414,9 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo
// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired.
// It will select the machines to reset the priority based on the descending order of creation timestamp.
func (m *McmManager) Refresh() error {
machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything())
if err != nil {
klog.Errorf("[Refresh] unable to list machine deployments")
return err
}
var collectiveError error
for _, machineDeployment := range machineDeployments {
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(machineDeployment) {
klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name)
continue
}
mcd, ok := m.machineDeployments[types.NamespacedName{Namespace: m.namespace, Name: machineDeployment.Name}]
if !ok {
klog.Errorf("[Refresh] machine deployment %s not found in the list of machine deployments", machineDeployment.Name)
continue
}
mcd.scalingMutex.Lock()
markedMachines := sets.New(strings.Split(machineDeployment.Annotations[machinesMarkedByCAForDeletion], ",")...)
// check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed.
machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
collectiveError = errors.Join(collectiveError, err)
mcd.scalingMutex.Unlock()
continue
}
var incorrectlyMarkedMachines []*Ref
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace})
}
}
collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(incorrectlyMarkedMachines))
mcd.scalingMutex.Unlock()
for _, machineDeployment := range m.machineDeployments {
collectiveError = errors.Join(collectiveError, machineDeployment.Refresh())
}
return collectiveError
}
Expand Down Expand Up @@ -512,28 +476,18 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}
markedMachines := sets.New(strings.Split(md.Annotations[machinesMarkedByCAForDeletion], ",")...)
var filteredTargetMachineRefs []*Ref
for _, targetMachineRef := range targetMachineRefs {
if !markedMachines.Has(targetMachineRef.Name) {
filteredTargetMachineRefs = append(filteredTargetMachineRefs, targetMachineRef)
markedMachines.Insert(targetMachineRef.Name)
} else {
klog.Infof("Machine %s is already marked for deletion, skipping", targetMachineRef.Name)
}
}

// update priorities of machines to be deleted except the ones already in termination to 1
err = m.prioritizeMachinesForDeletion(filteredTargetMachineRefs)
machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
if err != nil {
return err
}
markedMachines.Insert(machinesWithPrio1...)
// Trying to update the machineDeployment till the deadline
err = m.retry(func(ctx context.Context) (bool, error) {
return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, len(filteredTargetMachineRefs), strings.Join(markedMachines.UnsortedList(), ","))
return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machinesWithPrio1), strings.Join(markedMachines.UnsortedList(), ","))
}, "MachineDeployment", "update", commonMachineDeployment.Name)
if err != nil {
klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err)
return errors.Join(err, m.resetPriorityForMachines(filteredTargetMachineRefs))
klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
}
return nil
}
Expand All @@ -543,6 +497,10 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
var collectiveError error
for _, mcRef := range mcRefs {
machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name)
if kube_errors.IsNotFound(err) {
klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcRef.Name)
continue
}
if err != nil {
collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err))
continue
Expand All @@ -566,8 +524,9 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
}

// prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) error {
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) ([]string, error) {
var expectedToTerminateMachineNodePairs = make(map[string]string)
var machinesMarkedWithPrio1 []string
for _, machineRef := range targetMachineRefs {
// Trying to update the priority of machineRef till m.maxRetryTimeout
if err := m.retry(func(ctx context.Context) (bool, error) {
Expand All @@ -583,15 +542,20 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) err
if isMachineFailedOrTerminating(mc) {
return false, nil
}
if mc.Annotations[machinePriorityAnnotation] == priorityValueForCandidateMachines {
klog.Infof("Machine %q priority is already set to 1, hence skipping the update", mc.Name)
return false, nil
}
machinesMarkedWithPrio1 = append(machinesMarkedWithPrio1, machineRef.Name)
expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"]
return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines)
}, "Machine", "update", machineRef.Name); err != nil {
klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
return fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
return nil, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
}
}
klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs)
return nil
return machinesMarkedWithPrio1, nil
}

// updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed
Expand All @@ -606,25 +570,20 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin
return true, err
}
clone := machine.DeepCopy()
if clone.Annotations != nil {
if clone.Annotations[key] == val {
klog.Infof("Machine %q priority is already set to 1, hence skipping the update", machine.Name)
return false, nil
}
clone.Annotations[key] = val
} else {
if clone.Annotations == nil {
clone.Annotations = make(map[string]string)
clone.Annotations[key] = val
}
clone.Annotations[key] = val
_, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err == nil {
klog.Infof("Machine %s marked with priority %s successfully", mcName, val)
}
return true, err
}

// scaleDownMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down.
func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) {
// scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down.
// It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion.
func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err)
Expand Down

0 comments on commit 56d80ac

Please sign in to comment.