From 3af34aaa5760b161f1921c968a2f82268aabe610 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Thu, 12 Sep 2024 15:28:24 +0800 Subject: [PATCH] Support pause rollout --- controllers/function_controller.go | 9 +++++++-- controllers/sink_controller.go | 9 +++++++-- controllers/source_controller.go | 9 +++++++-- controllers/spec/common.go | 6 ++++++ 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/controllers/function_controller.go b/controllers/function_controller.go index cba15048..82db9c3f 100644 --- a/controllers/function_controller.go +++ b/controllers/function_controller.go @@ -99,6 +99,13 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) + + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(function) && !isNewGeneration { + return ctrl.Result{}, nil + } + err = r.ObserveFunctionStatefulSet(ctx, function) if err != nil { return reconcile.Result{}, err @@ -130,8 +137,6 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) - err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/sink_controller.go b/controllers/sink_controller.go index 711f362c..5b3a1d74 100644 --- a/controllers/sink_controller.go +++ b/controllers/sink_controller.go @@ -98,6 +98,13 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) + + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(sink) && !isNewGeneration { + return ctrl.Result{}, nil + } + err = r.ObserveSinkStatefulSet(ctx, sink) if err != nil { return reconcile.Result{}, err @@ -129,8 +136,6 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } - isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) - err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/source_controller.go b/controllers/source_controller.go index 70da00cc..a5ae0141 100644 --- a/controllers/source_controller.go +++ b/controllers/source_controller.go @@ -98,6 +98,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) + + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(source) && !isNewGeneration { + return ctrl.Result{}, nil + } + err = r.ObserveSourceStatefulSet(ctx, source) if err != nil { return reconcile.Result{}, err @@ -129,8 +136,6 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } - isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) - err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 18c56180..d8fa080c 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -99,6 +99,7 @@ const ( AnnotationPrometheusScrape = "prometheus.io/scrape" AnnotationPrometheusPort = "prometheus.io/port" AnnotationManaged = "compute.functionmesh.io/managed" + AnnotationPauseRollout = "compute.functionmesh.io/pause-rollout" AnnotationNeedCleanup = "compute.functionmesh.io/need-cleanup" // if labels contains below, we think it comes from function-mesh-worker-service @@ -171,6 +172,11 @@ func IsManaged(object metav1.Object) bool { return !exists || managed != "false" } +func IsPauseRollout(object metav1.Object) bool { + pauseRollout, exists := object.GetAnnotations()[AnnotationPauseRollout] + return exists && pauseRollout == "true" +} + func NeedCleanup(object metav1.Object) bool { // don't cleanup if it's managed by function-mesh-worker-service _, exists := object.GetLabels()[LabelPulsarCluster]