From 5dae4468f478140f3d5e5b54acc6ec76b9a066fb Mon Sep 17 00:00:00 2001 From: charlie Date: Sun, 20 Aug 2023 14:57:20 +0800 Subject: [PATCH] feat: support batchConfirm && GrayTime Signed-off-by: charlie --- module-controller/cmd/main.go | 15 ++-- .../controller/moduledeployment_controller.go | 77 ++++++++++++++++++- 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/module-controller/cmd/main.go b/module-controller/cmd/main.go index 771a43d0e..bd00a0104 100644 --- a/module-controller/cmd/main.go +++ b/module-controller/cmd/main.go @@ -22,13 +22,14 @@ import ( "go.uber.org/zap/zapcore" "os" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/workqueue" + + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -92,8 +93,10 @@ func main() { } if err = (&controller.ModuleDeploymentReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + DelayingQueue: workqueue.NewDelayingQueue(), + Set: map[string][]int32{}, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ModuleDeployment") os.Exit(1) diff --git a/module-controller/internal/controller/moduledeployment_controller.go b/module-controller/internal/controller/moduledeployment_controller.go index 471851dcf..8f30998c5 100644 --- a/module-controller/internal/controller/moduledeployment_controller.go +++ b/module-controller/internal/controller/moduledeployment_controller.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "strconv" + "strings" + "sync" "time" v1 "k8s.io/api/apps/v1" @@ -29,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -44,7 +48,10 @@ import ( // ModuleDeploymentReconciler reconciles a ModuleDeployment object type ModuleDeploymentReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + DelayingQueue workqueue.DelayingInterface + Set map[string][]int32 + mutex sync.Mutex } //+kubebuilder:rbac:groups=serverless.alipay.com,resources=moduledeployments,verbs=get;list;watch;create;update;patch;delete @@ -126,6 +133,21 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } } + case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressWaitingForConfirmation: + moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused + if err := r.Status().Update(ctx, moduleDeployment); err != nil { + return ctrl.Result{}, err + } + case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused: + moduleDeployment.Spec.Pause = true + if err := r.Update(ctx, moduleDeployment); err != nil { + return ctrl.Result{}, err + } + + moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting + if err := r.Status().Update(ctx, moduleDeployment); err != nil { + return ctrl.Result{}, err + } } // update moduleDeployment owner reference @@ -432,8 +454,61 @@ func (r *ModuleDeploymentReconciler) createNewReplicaSet(ctx context.Context, mo return moduleReplicaSet, nil } +type waitingLoop struct { + *ModuleDeploymentReconciler +} + +func (w *waitingLoop) Start(ctx context.Context) error { + for i := 0; i < 3; i++ { + go wait.Until(w.worker, time.Second, ctx.Done()) + } + return nil +} + +func (w *waitingLoop) worker() { + for w.processItem() { + } +} + +func (w *waitingLoop) processItem() bool { + item, shutdown := w.DelayingQueue.Get() + if shutdown { + return false + } + + defer w.DelayingQueue.Done(item) + + _ = w.HandleDelayingItem(item.(string)) + return true +} + +func (r *ModuleDeploymentReconciler) HandleDelayingItem(key string) error { + arr := strings.Split(key, "/") + if len(arr) != 2 { + return fmt.Errorf("invalid key %v", key) + } + moduleDeployment := &moduledeploymentv1alpha1.ModuleDeployment{} + name, namespace := arr[0], arr[1] + err := r.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, moduleDeployment) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + log.Log.Error(err, "get moduleDeployment failed") + return err + } + if moduleDeployment.Spec.Pause { + moduleDeployment.Spec.Pause = false + if err = r.Update(context.TODO(), moduleDeployment); err != nil { + log.Log.Error(err, "update moduleDeployment failed") + } + } + return err +} + // SetupWithManager sets up the controller with the Manager. func (r *ModuleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { + mgr.Add(&waitingLoop{r}) return ctrl.NewControllerManagedBy(mgr). For(&moduledeploymentv1alpha1.ModuleDeployment{}). Complete(r)