Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
test: add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: charlie <[email protected]>
  • Loading branch information
Charlie17Li committed Sep 2, 2023
1 parent dd38a22 commit 2306118
Show file tree
Hide file tree
Showing 3 changed files with 435 additions and 36 deletions.
19 changes: 19 additions & 0 deletions module-controller/internal/constants/label/well_known_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package label

const (
ModuleNameLabel = "serverless.alipay.com/module-name"

ModuleVersionLabel = "serverless.alipay.com/module-version"

BaseInstanceIpLabel = "serverless.alipay.com/base-instance-ip"

BaseInstanceNameLabel = "serverless.alipay.com/base-instance-name"

ModuleReplicasetLabel = "serverless.alipay.com/module-replicaset"

ModuleDeploymentLabel = "serverless.alipay.com/module-deployment"

DeleteModuleLabel = "serverless.alipay.com/delete-module"

ModuleInstanceCount = "serverless.alipay.com/module-instance-count"
)
232 changes: 196 additions & 36 deletions module-controller/internal/controller/modulereplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package controller
import (
"context"
"fmt"
"sort"
"strconv"

"github.com/hashicorp/go-multierror"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/sofastack/sofa-serverless/internal/constants/label"
"github.com/sofastack/sofa-serverless/internal/controller/utils"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -123,48 +125,18 @@ func (r *ModuleReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
// compare replicas
if int(moduleReplicaSet.Spec.Replicas) != len(existedModuleList.Items) {
// replicas change
selector, err := metav1.LabelSelectorAsSelector(&moduleReplicaSet.Spec.Selector)
selectedPods := &corev1.PodList{}
if err = r.List(ctx, selectedPods, &client.ListOptions{Namespace: req.Namespace, LabelSelector: selector}); err != nil {
log.Log.Error(err, "Failed to list pod", "moduleReplicaSetName", moduleReplicaSet.Name)
return reconcile.Result{}, nil
}

deltaReplicas := int(moduleReplicaSet.Spec.Replicas) - len(existedModuleList.Items)
limit := moduleReplicaSet.Spec.Template.Spec.Scheduling.MaxModuleCount
strategy := moduleReplicaSet.Spec.Template.Spec.Scheduling.Strategy

if deltaReplicas > 0 {
// scale up
toAllocatePod := utils.ScaleUp(selectedPods, existedModuleList, deltaReplicas, limit, strategy)

for _, pod := range toAllocatePod {
pod.Labels[fmt.Sprintf("%s/%s", moduledeploymentv1alpha1.ModuleNameLabel, moduleReplicaSet.Spec.Template.Spec.Module.Name)] = moduleReplicaSet.Spec.Template.Spec.Module.Version
err := r.Client.Update(ctx, &pod)
// TODO add pod finalizer
if err != nil {
// update pod label
return ctrl.Result{}, err
}
// create module
module := r.generateModule(moduleReplicaSet, pod)
if err = r.Client.Create(ctx, module); err != nil {
log.Log.Error(err, "Failed to create module", "moduleName", module.Name)
return reconcile.Result{}, nil
}
err = r.scaleup(existedModuleList, moduleReplicaSet)
if err != nil {
return reconcile.Result{}, err
}
} else {
// scale down
count := -deltaReplicas
log.Log.Info("scale down replicas", "deltaReplicas", deltaReplicas)
DeletedModules := utils.ScaleDown(selectedPods, existedModuleList, count, strategy)
for _, module := range DeletedModules {
module.Labels[moduledeploymentv1alpha1.DeleteModuleLabel] = "true"
err = multierror.Append(err, r.Client.Update(ctx, &module))
}
err = r.scaledown(existedModuleList, moduleReplicaSet)
if err != nil {
log.Log.Error(err, "Failed to delete module")
return ctrl.Result{}, err
return reconcile.Result{}, err
}
}
}
Expand Down Expand Up @@ -238,3 +210,191 @@ func (r *ModuleReplicaSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&moduledeploymentv1alpha1.ModuleReplicaSet{}).
Complete(r)
}

// scale up module
func (r *ModuleReplicaSetReconciler) scaleup(existedModuleList *moduledeploymentv1alpha1.ModuleList, moduleReplicaSet *moduledeploymentv1alpha1.ModuleReplicaSet) error {
selector, err := metav1.LabelSelectorAsSelector(&moduleReplicaSet.Spec.Selector)
selectedPods := &corev1.PodList{}
if err = r.List(context.TODO(), selectedPods, &client.ListOptions{Namespace: moduleReplicaSet.Namespace, LabelSelector: selector}); err != nil {
log.Log.Error(err, "Failed to list pod", "moduleReplicaSetName", moduleReplicaSet.Name)
return err
}

toAllocatePod := r.getScaleUpCandidatePods(existedModuleList, selectedPods, moduleReplicaSet)
for _, pod := range toAllocatePod {
pod.Labels[fmt.Sprintf("%s-%s", label.ModuleNameLabel, moduleReplicaSet.Spec.Template.Spec.Module.Name)] = moduleReplicaSet.Spec.Template.Spec.Module.Version
if _, exist := pod.Labels[label.ModuleInstanceCount]; exist {
count, err := strconv.Atoi(pod.Labels[label.ModuleInstanceCount])
if err != nil {
log.Log.Error(err, "failed to update module count")
} else {
pod.Labels[label.ModuleInstanceCount] = strconv.Itoa(count + 1)
}
} else {
pod.Labels[label.ModuleInstanceCount] = "1"
}
err := r.Client.Update(context.TODO(), &pod)
// TODO add pod finalizer
if err != nil {
// update pod label
return err
}
// create module
module := r.generateModule(moduleReplicaSet, pod)
if err = r.Client.Create(context.TODO(), module); err != nil {
log.Log.Error(err, "Failed to create module", "moduleName", module.Name)
return err
}
}
return nil
}

// scale down module
func (r *ModuleReplicaSetReconciler) scaledown(existedModuleList *moduledeploymentv1alpha1.ModuleList, moduleReplicaSet *moduledeploymentv1alpha1.ModuleReplicaSet) error {
deltaReplicas := int(moduleReplicaSet.Spec.Replicas) - len(existedModuleList.Items)
count := -deltaReplicas
log.Log.Info("scale down replicas", "deltaReplicas", deltaReplicas)

selector, err := metav1.LabelSelectorAsSelector(&moduleReplicaSet.Spec.Selector)
selectedPods := &corev1.PodList{}
if err = r.List(context.TODO(), selectedPods, &client.ListOptions{Namespace: moduleReplicaSet.Namespace, LabelSelector: selector}); err != nil {
log.Log.Error(err, "Failed to list pod", "moduleReplicaSetName", moduleReplicaSet.Name)
return err
}
toDeletedModules := r.getScaleDownCandidateModules(existedModuleList, selectedPods, moduleReplicaSet)
for _, module := range toDeletedModules {
module.Labels[label.DeleteModuleLabel] = "true"
err = r.Client.Update(context.TODO(), &module)
if err != nil {
log.Log.Error(err, "Failed to delete module", "module", module)
}
if count--; count == 0 {
break
}
}

return err
}

// get the candidate pods used to install modules when scaling up
func (r *ModuleReplicaSetReconciler) getScaleUpCandidatePods(
existedModuleList *moduledeploymentv1alpha1.ModuleList,
selectedPods *corev1.PodList,
moduleReplicaSet *moduledeploymentv1alpha1.ModuleReplicaSet,
) []corev1.Pod {
deltaReplicas := int(moduleReplicaSet.Spec.Replicas) - len(existedModuleList.Items)
usedPodNames := make(map[string]bool)
for _, module := range existedModuleList.Items {
usedPodNames[module.Labels[label.BaseInstanceNameLabel]] = true
}

strategy := moduleReplicaSet.Spec.Template.Spec.Scheduling.Strategy
maxModuleCount := moduleReplicaSet.Spec.Template.Spec.Scheduling.MaxModuleCount

if strategy == moduledeploymentv1alpha1.Scatter {
sort.Slice(selectedPods.Items, func(i, j int) bool {
count_i, err := strconv.Atoi(selectedPods.Items[i].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}
count_j, err := strconv.Atoi(selectedPods.Items[j].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}

return count_i < count_j
})
} else if strategy == moduledeploymentv1alpha1.Stacking {
sort.Slice(selectedPods.Items, func(i, j int) bool {
count_i, err := strconv.Atoi(selectedPods.Items[i].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}
count_j, err := strconv.Atoi(selectedPods.Items[j].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}

return count_i > count_j
})
}

// allocate pod
var toAllocatePod []corev1.Pod
count := deltaReplicas
for _, pod := range selectedPods.Items {
instanceCount, err := strconv.Atoi(pod.Labels[label.ModuleInstanceCount])
if err != nil {
log.Log.Error(err, fmt.Sprintf("invalid ModuleInstanceCount in pod %v", pod.Name))
continue
}
if _, ok := usedPodNames[pod.Name]; !ok && instanceCount < maxModuleCount {
toAllocatePod = append(toAllocatePod, pod)
if count--; count == 0 {
break
}
}
}
return toAllocatePod
}

// get the candidate modules to be deleted when scaling down
func (r *ModuleReplicaSetReconciler) getScaleDownCandidateModules(
existedModuleList *moduledeploymentv1alpha1.ModuleList,
selectedPods *corev1.PodList,
moduleReplicaSet *moduledeploymentv1alpha1.ModuleReplicaSet,
) []moduledeploymentv1alpha1.Module {
deltaReplicas := int(moduleReplicaSet.Spec.Replicas) - len(existedModuleList.Items)
usedPodNames := make(map[string]int)
for idx, module := range existedModuleList.Items {
usedPodNames[module.Labels[label.BaseInstanceNameLabel]] = idx
}

var filteredPods []*corev1.Pod
for i := 0; i < len(selectedPods.Items); i++ {
if _, ok := usedPodNames[selectedPods.Items[i].Name]; ok {
filteredPods = append(filteredPods, &selectedPods.Items[i])
}
}

strategy := moduleReplicaSet.Spec.Template.Spec.Scheduling.Strategy

if strategy == moduledeploymentv1alpha1.Scatter {
sort.Slice(filteredPods, func(i, j int) bool {
count_i, err := strconv.Atoi(filteredPods[i].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}
count_j, err := strconv.Atoi(filteredPods[j].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}

return count_i > count_j
})
} else if strategy == moduledeploymentv1alpha1.Stacking {
sort.Slice(filteredPods, func(i, j int) bool {
count_i, err := strconv.Atoi(filteredPods[i].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}
count_j, err := strconv.Atoi(filteredPods[j].Labels[label.ModuleInstanceCount])
if err != nil {
return true
}

return count_i < count_j
})
}

var candidateModules []moduledeploymentv1alpha1.Module
i := 0
count := -deltaReplicas
for count > 0 && i < len(filteredPods) {
idx := usedPodNames[filteredPods[i].Name]
candidateModules = append(candidateModules, existedModuleList.Items[idx])
count -= 1
i += 1
}
return candidateModules
}
Loading

0 comments on commit 2306118

Please sign in to comment.