Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
fix: unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Charlie17Li committed Sep 25, 2023
1 parent 4106469 commit 68bd0dd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 77 deletions.
14 changes: 6 additions & 8 deletions module-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package main

import (
"flag"
"github.com/sofastack/sofa-serverless/internal/controller"
"go.uber.org/zap/zapcore"
"os"

"go.uber.org/zap/zapcore"

"github.com/sofastack/sofa-serverless/internal/controller"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -93,10 +93,8 @@ func main() {
}

if err = (&controller.ModuleDeploymentReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
DelayingQueue: workqueue.NewDelayingQueue(),
Set: map[string][]int32{},
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ModuleDeployment")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -31,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -117,15 +115,7 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting:
// update moduleReplicaSet
enqueue, err := r.updateModuleReplicaSet(moduleDeployment, newRS, oldRSs)
if err != nil {
return ctrl.Result{}, err
}
if enqueue {
requeueAfter := utils.GetNextReconcileTime(time.Now())
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
return r.updateModuleReplicaSet(moduleDeployment, newRS, oldRSs)
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressCompleted:
if moduleDeployment.Spec.Replicas != newRS.Spec.Replicas {
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
Expand Down Expand Up @@ -344,7 +334,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicas(
}

func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *moduledeploymentv1alpha1.ModuleDeployment,
newRS *moduledeploymentv1alpha1.ModuleReplicaSet, oldRSs []*moduledeploymentv1alpha1.ModuleReplicaSet) (bool, error) {
newRS *moduledeploymentv1alpha1.ModuleReplicaSet, oldRSs []*moduledeploymentv1alpha1.ModuleReplicaSet) (ctrl.Result, error) {
var (
ctx = context.TODO()

Expand All @@ -362,7 +352,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo
// wait moduleReplicaset ready
if replicas := (curBatch - 1) * (moduleDeployment.Spec.Replicas / batchCount); replicas > curReplicas {
log.Log.Info(fmt.Sprintf("newRs is not ready, expect replicas %v, but got %v", replicas, curReplicas))
return true, nil
return ctrl.Result{Requeue: true, RequeueAfter: utils.GetNextReconcileTime(time.Now())}, nil
}

if curReplicas >= expReplicas {
Expand All @@ -374,7 +364,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo
LastTransitionTime: metav1.Now(),
Message: "deployment release progress completed",
})
return false, r.Status().Update(ctx, moduleDeployment)
return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment)
}

replicas := curBatch * (moduleDeployment.Spec.Replicas / batchCount)
Expand All @@ -384,7 +374,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo

err := r.updateModuleReplicas(ctx, replicas, moduleDeployment, newRS, oldRSs)
if err != nil {
return false, err
return ctrl.Result{}, err
}

moduleDeployment.Status.ReleaseStatus.CurrentBatch += 1
Expand All @@ -411,7 +401,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo
Message: fmt.Sprintf("deployment release: curbatch %v, batchCount %v", curBatch, batchCount),
})

return grayTime == 0, r.Status().Update(ctx, moduleDeployment)
return ctrl.Result{Requeue: true, RequeueAfter: time.Duration(grayTime) * time.Second}, r.Status().Update(ctx, moduleDeployment)
}

// generate module replicas
Expand Down Expand Up @@ -468,61 +458,8 @@ func (r *ModuleDeploymentReconciler) createNewReplicaSet(ctx context.Context, mo
return moduleReplicaSet, nil
}

type waitingLoop struct {
*ModuleDeploymentReconciler
}

func (w *waitingLoop) Start(ctx context.Context) error {
for i := 0; i < 3; i++ {
go wait.Until(w.worker, time.Second, ctx.Done())
}
return nil
}

func (w *waitingLoop) worker() {
for w.processItem() {
}
}

func (w *waitingLoop) processItem() bool {
item, shutdown := w.DelayingQueue.Get()
if shutdown {
return false
}

defer w.DelayingQueue.Done(item)

_ = w.HandleDelayingItem(item.(string))
return true
}

func (r *ModuleDeploymentReconciler) HandleDelayingItem(key string) error {
arr := strings.Split(key, "/")
if len(arr) != 2 {
return fmt.Errorf("invalid key %v", key)
}
moduleDeployment := &moduledeploymentv1alpha1.ModuleDeployment{}
name, namespace := arr[0], arr[1]
err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, moduleDeployment)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
log.Log.Error(err, "get moduleDeployment failed")
return err
}
if moduleDeployment.Spec.Pause {
moduleDeployment.Spec.Pause = false
if err = r.Update(context.TODO(), moduleDeployment); err != nil {
log.Log.Error(err, "update moduleDeployment failed")
}
}
return err
}

// SetupWithManager sets up the controller with the Manager.
func (r *ModuleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
mgr.Add(&waitingLoop{r})
return ctrl.NewControllerManagedBy(mgr).
For(&moduledeploymentv1alpha1.ModuleDeployment{}).
Complete(r)
Expand Down

0 comments on commit 68bd0dd

Please sign in to comment.