diff --git a/.github/workflows/module_controller_ci_build_deploy_to_aliyun.yml b/.github/workflows/module_controller_ci_build_deploy_to_aliyun.yml index db21e3cb1..c65e502a7 100644 --- a/.github/workflows/module_controller_ci_build_deploy_to_aliyun.yml +++ b/.github/workflows/module_controller_ci_build_deploy_to_aliyun.yml @@ -176,6 +176,7 @@ jobs: exit 0 else echo "等待字段值满足条件..." + echo "期望状态是: $desired_field_value, 当前状态是: $field_value" sleep 5 # 等待一段时间后再次检查 fi done @@ -487,4 +488,4 @@ jobs: fi sleep 5 fi - done \ No newline at end of file + done diff --git a/module-controller/api/v1alpha1/moduledeployment_types.go b/module-controller/api/v1alpha1/moduledeployment_types.go index 34ebb55f9..f47b0237f 100644 --- a/module-controller/api/v1alpha1/moduledeployment_types.go +++ b/module-controller/api/v1alpha1/moduledeployment_types.go @@ -48,7 +48,8 @@ const ( ModuleDeploymentReleaseProgressPaused ReleaseProgress = "Paused" ModuleDeploymentReleaseProgressCompleted ReleaseProgress = "Completed" ModuleDeploymentReleaseProgressAborted ReleaseProgress = "Aborted" - ModuleDeploymentReleaseProgressTermed ReleaseProgress = "Terminated" + ModuleDeploymentReleaseProgressTerminating ReleaseProgress = "Terminating" + ModuleDeploymentReleaseProgressTerminated ReleaseProgress = "Terminated" ) type ModuleUpgradeType string @@ -120,7 +121,7 @@ type ModuleOperationStrategy struct { BatchCount int32 `json:"batchCount,omitempty"` - MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + MaxUnavailable int32 `json:"maxUnavailable,omitempty"` GrayTimeBetweenBatchSeconds int32 `json:"grayTimeBetweenBatchSeconds,omitempty"` diff --git a/module-controller/api/v1alpha1/zz_generated.deepcopy.go b/module-controller/api/v1alpha1/zz_generated.deepcopy.go index 50a94624e..8fed3a0b8 100644 --- a/module-controller/api/v1alpha1/zz_generated.deepcopy.go +++ b/module-controller/api/v1alpha1/zz_generated.deepcopy.go @@ -23,7 +23,6 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -132,7 +131,7 @@ func (in *ModuleDeploymentList) DeepCopyObject() runtime.Object { func (in *ModuleDeploymentSpec) DeepCopyInto(out *ModuleDeploymentSpec) { *out = *in in.Template.DeepCopyInto(&out.Template) - in.OperationStrategy.DeepCopyInto(&out.OperationStrategy) + out.OperationStrategy = in.OperationStrategy out.SchedulingStrategy = in.SchedulingStrategy } @@ -228,11 +227,6 @@ func (in *ModuleList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModuleOperationStrategy) DeepCopyInto(out *ModuleOperationStrategy) { *out = *in - if in.MaxUnavailable != nil { - in, out := &in.MaxUnavailable, &out.MaxUnavailable - *out = new(intstr.IntOrString) - **out = **in - } out.ServiceStrategy = in.ServiceStrategy } @@ -310,7 +304,7 @@ func (in *ModuleReplicaSetSpec) DeepCopyInto(out *ModuleReplicaSetSpec) { *out = *in in.Selector.DeepCopyInto(&out.Selector) in.Template.DeepCopyInto(&out.Template) - in.OperationStrategy.DeepCopyInto(&out.OperationStrategy) + out.OperationStrategy = in.OperationStrategy out.SchedulingStrategy = in.SchedulingStrategy } diff --git a/module-controller/config/crd/bases/serverless.alipay.com_moduledeployments.yaml b/module-controller/config/crd/bases/serverless.alipay.com_moduledeployments.yaml index 27bf6ebd0..1fe788c00 100644 --- a/module-controller/config/crd/bases/serverless.alipay.com_moduledeployments.yaml +++ b/module-controller/config/crd/bases/serverless.alipay.com_moduledeployments.yaml @@ -53,10 +53,8 @@ spec: format: int32 type: integer maxUnavailable: - anyOf: - - type: integer - - type: string - x-kubernetes-int-or-string: true + format: int32 + type: integer needConfirm: type: boolean serviceStrategy: diff --git a/module-controller/config/crd/bases/serverless.alipay.com_modulereplicasets.yaml b/module-controller/config/crd/bases/serverless.alipay.com_modulereplicasets.yaml index 2c6cc0e3b..adf8e3739 100644 --- a/module-controller/config/crd/bases/serverless.alipay.com_modulereplicasets.yaml +++ b/module-controller/config/crd/bases/serverless.alipay.com_modulereplicasets.yaml @@ -48,10 +48,8 @@ spec: format: int32 type: integer maxUnavailable: - anyOf: - - type: integer - - type: string - x-kubernetes-int-or-string: true + format: int32 + type: integer needConfirm: type: boolean serviceStrategy: diff --git a/module-controller/internal/controller/moduledeployment_controller.go b/module-controller/internal/controller/moduledeployment_controller.go index 49ea46b7a..9361768cb 100644 --- a/module-controller/internal/controller/moduledeployment_controller.go +++ b/module-controller/internal/controller/moduledeployment_controller.go @@ -82,9 +82,15 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req } if moduleDeployment.DeletionTimestamp != nil { - // delete moduleDeployment event.PublishModuleDeploymentDeleteEvent(r.Client, ctx, moduleDeployment) - return r.handleDeletingModuleDeployment(ctx, moduleDeployment) + if !utils.HasFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleReplicaSetExistedFinalizer) && + !utils.HasFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleExistedFinalizer) { + if moduleDeployment.Status.ReleaseStatus.Progress != v1alpha1.ModuleDeploymentReleaseProgressTerminated { + moduleDeployment.Status.ReleaseStatus.Progress = v1alpha1.ModuleDeploymentReleaseProgressTerminated + return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment) + } + return ctrl.Result{}, nil + } } if moduleDeployment.Generation == 1 { @@ -129,6 +135,10 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req case v1alpha1.ModuleDeploymentReleaseProgressExecuting: return r.updateModuleReplicaSet(ctx, moduleDeployment, newRS) case v1alpha1.ModuleDeploymentReleaseProgressCompleted: + if moduleDeployment.DeletionTimestamp != nil { + moduleDeployment.Status.ReleaseStatus.Progress = v1alpha1.ModuleDeploymentReleaseProgressTerminating + return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment) + } if moduleDeployment.Spec.Replicas != newRS.Spec.Replicas { moduleDeployment.Status.ReleaseStatus.Progress = v1alpha1.ModuleDeploymentReleaseProgressInit log.Log.Info("update release status progress to init when complete moduleDeployment", "moduleDeploymentName", moduleDeployment.Name) @@ -161,6 +171,27 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, utils.Error(err, "update moduleDeployment progress from paused to executing failed") } } + case v1alpha1.ModuleDeploymentReleaseProgressTerminating: + // delete modules + if utils.HasFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleExistedFinalizer) { + if moduleDeployment.Spec.Replicas != 0 { + moduleDeployment.Spec.Replicas = 0 + return ctrl.Result{}, r.Update(ctx, moduleDeployment) + } + if newRS.Status.Replicas != 0 { + handleInitModuleDeployment(moduleDeployment, newRS) + return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment) + } + utils.RemoveFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleExistedFinalizer) + return ctrl.Result{}, r.Update(ctx, moduleDeployment) + } + + // delete module replicaset + if utils.HasFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleReplicaSetExistedFinalizer) { + return r.handleDeletingModuleDeployment(ctx, moduleDeployment) + } + case v1alpha1.ModuleDeploymentReleaseProgressTerminated: + return ctrl.Result{}, nil } return ctrl.Result{}, nil } @@ -190,10 +221,6 @@ func handleInitModuleDeployment(moduleDeployment *v1alpha1.ModuleDeployment, new // handle deleting module deployment func (r *ModuleDeploymentReconciler) handleDeletingModuleDeployment(ctx context.Context, moduleDeployment *v1alpha1.ModuleDeployment) (ctrl.Result, error) { - if !utils.HasFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleReplicaSetExistedFinalizer) { - return ctrl.Result{}, nil - } - existReplicaset := true set := map[string]string{ label.ModuleDeploymentLabel: moduleDeployment.Name, @@ -218,6 +245,7 @@ func (r *ModuleDeploymentReconciler) handleDeletingModuleDeployment(ctx context. return ctrl.Result{}, utils.Error(err, "Failed to delete moduleReplicaSet", "moduleReplicaSetName", replicaSetList.Items[i].Name) } } + requeueAfter := utils.GetNextReconcileTime(moduleDeployment.DeletionTimestamp.Time) return ctrl.Result{RequeueAfter: requeueAfter}, nil } else { @@ -249,6 +277,7 @@ func (r *ModuleDeploymentReconciler) updateOwnerReference(ctx context.Context, m }) moduleDeployment.SetOwnerReferences(ownerReference) utils.AddFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleReplicaSetExistedFinalizer) + utils.AddFinalizer(&moduleDeployment.ObjectMeta, finalizer.ModuleExistedFinalizer) err = utils.UpdateResource(r.Client, ctx, moduleDeployment) if err != nil { return utils.Error(err, "Failed to update moduleDeployment", "moduleDeploymentName", moduleDeployment.Name) @@ -304,6 +333,9 @@ func (r *ModuleDeploymentReconciler) createOrGetModuleReplicas(ctx context.Conte log.Log.Info("module has changed, need create a new replicaset") } + if moduleDeployment.DeletionTimestamp != nil { + return nil, nil, false, nil + } // create a new moduleReplicaset moduleReplicaSet, err := r.createNewReplicaSet(ctx, moduleDeployment, maxVersion+1) if err != nil { @@ -381,6 +413,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context, } err := r.updateModuleReplicas(ctx, replicas, moduleDeployment, newRS) + if err != nil { return ctrl.Result{}, err } diff --git a/module-controller/internal/controller/moduledeployment_controller_operation_strategy_suit_test.go b/module-controller/internal/controller/moduledeployment_controller_operation_strategy_suit_test.go index a33e45a23..886127daf 100644 --- a/module-controller/internal/controller/moduledeployment_controller_operation_strategy_suit_test.go +++ b/module-controller/internal/controller/moduledeployment_controller_operation_strategy_suit_test.go @@ -2,19 +2,21 @@ package controller import ( "context" - "github.com/sofastack/sofa-serverless/internal/utils" - "sigs.k8s.io/controller-runtime/pkg/log" + "fmt" "time" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/log" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/sofastack/sofa-serverless/api/v1alpha1" "github.com/sofastack/sofa-serverless/internal/constants/label" + "github.com/sofastack/sofa-serverless/internal/utils" ) var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { @@ -97,11 +99,11 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { newModuleDeployment.Spec.Template.Spec.Module.Version = "1.0.1" Expect(k8sClient.Update(context.TODO(), &newModuleDeployment)).Should(Succeed()) - Eventually(func() bool { + Eventually(func() error { return checkModuleDeploymentReplicas( types.NamespacedName{Name: moduleDeploymentName, Namespace: namespace}, newModuleDeployment.Spec.Replicas) - }, timeout, interval).Should(BeTrue()) + }, timeout, interval).Should(Succeed()) }) }) @@ -189,9 +191,18 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { } // when install module, the podIP is necessary pod.Status.PodIP = "127.0.0.1" - return k8sClient.Status().Update(context.TODO(), &pod) == nil - }, timeout, interval).Should(BeTrue()) + if k8sClient.Status().Update(context.TODO(), &pod) != nil { + return false + } + pod2 := preparePod(namespace, "fake-pod-4") + if err := k8sClient.Create(context.TODO(), &pod2); err != nil { + return false + } + // when install module, the podIP is necessary + pod.Status.PodIP = "127.0.0.1" + return k8sClient.Status().Update(context.TODO(), &pod2) == nil + }, timeout, interval).Should(BeTrue()) }) It("1. create a new moduleDeployment", func() { @@ -200,20 +211,20 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { It("2. check if the replicas is 1", func() { // todo: we just check deployment.status.replicas rather than modulereplicaset - Eventually(func() bool { + Eventually(func() error { if err := k8sClient.Get(context.TODO(), nn, &moduleDeployment); err != nil { - return false + return err } if !moduleDeployment.Spec.Pause { - return false + return fmt.Errorf("the deployment is not paused") } return checkModuleDeploymentReplicas( types.NamespacedName{ Name: moduleDeploymentName, Namespace: moduleDeployment.Namespace}, 1) - }, timeout, interval).Should(BeTrue()) + }, timeout, interval).Should(Succeed()) }) It("3. resume", func() { @@ -239,9 +250,68 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { }, timeout, interval).Should(BeTrue()) }) - It("5. delete moduleDeployment", func() { + It("5. add another finalizer to prevent module-deployment from being deleted ", func() { + utils.AddFinalizer(&moduleDeployment.ObjectMeta, "test") + Expect(k8sClient.Update(context.TODO(), &moduleDeployment)).Should(Succeed()) + }) + + It("6. delete moduleDeployment", func() { Expect(k8sClient.Delete(context.TODO(), &moduleDeployment)).Should(Succeed()) }) + + It("7. check if the replicas is 1", func() { + // todo: we just check deployment.status.replicas rather than modulereplicaset + Eventually(func() error { + if err := k8sClient.Get(context.TODO(), nn, &moduleDeployment); err != nil { + return err + } + + if !moduleDeployment.Spec.Pause { + return fmt.Errorf("the deployment is not paused") + } + + return checkModuleDeploymentReplicas( + types.NamespacedName{ + Name: moduleDeploymentName, + Namespace: moduleDeployment.Namespace}, 1) + }, timeout, interval).Should(Succeed()) + }) + + It("8. resume", func() { + Eventually(func() bool { + Expect(k8sClient.Get(context.TODO(), nn, &moduleDeployment)).Should(Succeed()) + + moduleDeployment.Spec.Pause = false + return Expect(k8sClient.Update(context.TODO(), &moduleDeployment)).Should(Succeed()) + }, timeout, interval).Should(BeTrue()) + }) + + It("9. check if the moduleDeployment status is Terminated", func() { + Eventually(func() error { + if err := k8sClient.Get(context.TODO(), nn, &moduleDeployment); err != nil { + return err + } + + if moduleDeployment.Spec.Pause != false { + return fmt.Errorf("the module-deployment is paused") + } + + if moduleDeployment.Status.ReleaseStatus == nil { + return fmt.Errorf("release status is nil") + } + + if moduleDeployment.Status.ReleaseStatus.Progress != v1alpha1.ModuleDeploymentReleaseProgressTerminated { + return fmt.Errorf("expect status %v, but got %v", + v1alpha1.ModuleDeploymentReleaseProgressTerminated, moduleDeployment.Status.ReleaseStatus.Progress) + } + return nil + }, timeout, interval).Should(Succeed()) + }) + + It("10. clean module-deployment", func() { + utils.RemoveFinalizer(&moduleDeployment.ObjectMeta, "test") + Expect(k8sClient.Update(context.TODO(), &moduleDeployment)) + }) }) Context("test useBeta strategy", func() { @@ -271,7 +341,7 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { }) It("2. check if use Beta strategy", func() { - Eventually(func() bool { + Eventually(func() error { return checkModuleDeploymentReplicas(nn, 1) }) }) @@ -279,7 +349,6 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { It("3. clean environment", func() { Expect(k8sClient.Delete(context.TODO(), &moduleDeployment)).Should(Succeed()) }) - }) Context("delete module deployment", func() { @@ -310,14 +379,14 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() { }) }) -func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) bool { +func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) error { set := map[string]string{ label.ModuleDeploymentLabel: nn.Name, } replicaSetList := &v1alpha1.ModuleReplicaSetList{} err := k8sClient.List(context.TODO(), replicaSetList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(set)}, client.InNamespace(nn.Namespace)) if err != nil || len(replicaSetList.Items) == 0 { - return false + return fmt.Errorf("the replicasetList is empty") } maxVersion := 0 @@ -325,7 +394,7 @@ func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) bool for i := 0; i < len(replicaSetList.Items); i++ { version, err := getRevision(&replicaSetList.Items[i]) if err != nil { - return false + return err } if version > maxVersion { maxVersion = version @@ -334,10 +403,18 @@ func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) bool } // the replicas of new replicaset must be equal to newModuleDeployment - log.Log.Info("checkModuleDeploymentReplicas", "newRS.Status.Replicas", newRS.Status.Replicas, "newRS.Spec.Replicas", newRS.Spec.Replicas, "replicas", replicas) - return newRS != nil && - newRS.Status.Replicas == newRS.Spec.Replicas && - newRS.Status.Replicas == replicas + if newRS == nil { + return fmt.Errorf("the replicaset is nil") + } + if newRS.Status.Replicas != newRS.Spec.Replicas { + return fmt.Errorf("the replicaset is not ready, expect replicas is %v, but got %v", + newRS.Spec.Replicas, newRS.Status.ReadyReplicas) + } + if newRS.Spec.Replicas != replicas { + return fmt.Errorf("the deployment is not ready, expect replicas is %v, but got %v", + replicas, newRS.Spec.Replicas) + } + return nil } func waitModuleDeploymentCompleted(moduleDeploymentName string, namespace string) { diff --git a/module-controller/internal/controller/modulereplicaset_controller.go b/module-controller/internal/controller/modulereplicaset_controller.go index d047a2f07..bb5afcb17 100644 --- a/module-controller/internal/controller/modulereplicaset_controller.go +++ b/module-controller/internal/controller/modulereplicaset_controller.go @@ -19,10 +19,11 @@ package controller import ( "context" "fmt" - "github.com/sofastack/sofa-serverless/internal/event" "sort" "strconv" + "github.com/sofastack/sofa-serverless/internal/event" + "k8s.io/apimachinery/pkg/selection" "golang.org/x/tools/container/intsets"