From 68bd0ddb99962ebc09af5a36b6b2b6ba0c8795d7 Mon Sep 17 00:00:00 2001 From: charlie Date: Mon, 25 Sep 2023 19:18:08 +0800 Subject: [PATCH] fix: unit test --- module-controller/cmd/main.go | 14 ++-- .../controller/moduledeployment_controller.go | 75 ++----------------- 2 files changed, 12 insertions(+), 77 deletions(-) diff --git a/module-controller/cmd/main.go b/module-controller/cmd/main.go index bd00a0104..d29f13b98 100644 --- a/module-controller/cmd/main.go +++ b/module-controller/cmd/main.go @@ -18,15 +18,15 @@ package main import ( "flag" - "github.com/sofastack/sofa-serverless/internal/controller" - "go.uber.org/zap/zapcore" "os" + "go.uber.org/zap/zapcore" + + "github.com/sofastack/sofa-serverless/internal/controller" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/util/workqueue" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -93,10 +93,8 @@ func main() { } if err = (&controller.ModuleDeploymentReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - DelayingQueue: workqueue.NewDelayingQueue(), - Set: map[string][]int32{}, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ModuleDeployment") os.Exit(1) diff --git a/module-controller/internal/controller/moduledeployment_controller.go b/module-controller/internal/controller/moduledeployment_controller.go index 29caf00e3..c7626cdc4 100644 --- a/module-controller/internal/controller/moduledeployment_controller.go +++ b/module-controller/internal/controller/moduledeployment_controller.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "strconv" - "strings" "sync" "time" @@ -31,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -117,15 +115,7 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting: - // update moduleReplicaSet - enqueue, err := r.updateModuleReplicaSet(moduleDeployment, newRS, oldRSs) - if err != nil { - return ctrl.Result{}, err - } - if enqueue { - requeueAfter := utils.GetNextReconcileTime(time.Now()) - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + return r.updateModuleReplicaSet(moduleDeployment, newRS, oldRSs) case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressCompleted: if moduleDeployment.Spec.Replicas != newRS.Spec.Replicas { moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting @@ -344,7 +334,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicas( } func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *moduledeploymentv1alpha1.ModuleDeployment, - newRS *moduledeploymentv1alpha1.ModuleReplicaSet, oldRSs []*moduledeploymentv1alpha1.ModuleReplicaSet) (bool, error) { + newRS *moduledeploymentv1alpha1.ModuleReplicaSet, oldRSs []*moduledeploymentv1alpha1.ModuleReplicaSet) (ctrl.Result, error) { var ( ctx = context.TODO() @@ -362,7 +352,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo // wait moduleReplicaset ready if replicas := (curBatch - 1) * (moduleDeployment.Spec.Replicas / batchCount); replicas > curReplicas { log.Log.Info(fmt.Sprintf("newRs is not ready, expect replicas %v, but got %v", replicas, curReplicas)) - return true, nil + return ctrl.Result{Requeue: true, RequeueAfter: utils.GetNextReconcileTime(time.Now())}, nil } if curReplicas >= expReplicas { @@ -374,7 +364,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo LastTransitionTime: metav1.Now(), Message: "deployment release progress completed", }) - return false, r.Status().Update(ctx, moduleDeployment) + return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment) } replicas := curBatch * (moduleDeployment.Spec.Replicas / batchCount) @@ -384,7 +374,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo err := r.updateModuleReplicas(ctx, replicas, moduleDeployment, newRS, oldRSs) if err != nil { - return false, err + return ctrl.Result{}, err } moduleDeployment.Status.ReleaseStatus.CurrentBatch += 1 @@ -411,7 +401,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(moduleDeployment *mo Message: fmt.Sprintf("deployment release: curbatch %v, batchCount %v", curBatch, batchCount), }) - return grayTime == 0, r.Status().Update(ctx, moduleDeployment) + return ctrl.Result{Requeue: true, RequeueAfter: time.Duration(grayTime) * time.Second}, r.Status().Update(ctx, moduleDeployment) } // generate module replicas @@ -468,61 +458,8 @@ func (r *ModuleDeploymentReconciler) createNewReplicaSet(ctx context.Context, mo return moduleReplicaSet, nil } -type waitingLoop struct { - *ModuleDeploymentReconciler -} - -func (w *waitingLoop) Start(ctx context.Context) error { - for i := 0; i < 3; i++ { - go wait.Until(w.worker, time.Second, ctx.Done()) - } - return nil -} - -func (w *waitingLoop) worker() { - for w.processItem() { - } -} - -func (w *waitingLoop) processItem() bool { - item, shutdown := w.DelayingQueue.Get() - if shutdown { - return false - } - - defer w.DelayingQueue.Done(item) - - _ = w.HandleDelayingItem(item.(string)) - return true -} - -func (r *ModuleDeploymentReconciler) HandleDelayingItem(key string) error { - arr := strings.Split(key, "/") - if len(arr) != 2 { - return fmt.Errorf("invalid key %v", key) - } - moduleDeployment := &moduledeploymentv1alpha1.ModuleDeployment{} - name, namespace := arr[0], arr[1] - err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, moduleDeployment) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - log.Log.Error(err, "get moduleDeployment failed") - return err - } - if moduleDeployment.Spec.Pause { - moduleDeployment.Spec.Pause = false - if err = r.Update(context.TODO(), moduleDeployment); err != nil { - log.Log.Error(err, "update moduleDeployment failed") - } - } - return err -} - // SetupWithManager sets up the controller with the Manager. func (r *ModuleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { - mgr.Add(&waitingLoop{r}) return ctrl.NewControllerManagedBy(mgr). For(&moduledeploymentv1alpha1.ModuleDeployment{}). Complete(r)