diff --git a/.ci/tests/integration/cases/java-download-function/manifests.yaml b/.ci/tests/integration/cases/java-download-function/manifests.yaml index e6eb76dc..4763bae5 100644 --- a/.ci/tests/integration/cases/java-download-function/manifests.yaml +++ b/.ci/tests/integration/cases/java-download-function/manifests.yaml @@ -1,6 +1,8 @@ apiVersion: compute.functionmesh.io/v1alpha1 kind: Function metadata: + annotations: + compute.functionmesh.io/pause-rollout: "false" name: function-download-sample namespace: default spec: diff --git a/.ci/tests/integration/cases/java-function-vpa/manifests.yaml b/.ci/tests/integration/cases/java-function-vpa/manifests.yaml index 219ae545..a5e4a79e 100644 --- a/.ci/tests/integration/cases/java-function-vpa/manifests.yaml +++ b/.ci/tests/integration/cases/java-function-vpa/manifests.yaml @@ -1,6 +1,8 @@ apiVersion: compute.functionmesh.io/v1alpha1 kind: Function metadata: + annotations: + compute.functionmesh.io/pause-rollout: "true" name: function-sample-vpa namespace: default spec: diff --git a/.ci/tests/integration/cases/java-function/manifests.yaml b/.ci/tests/integration/cases/java-function/manifests.yaml index d2b7391d..e7effe14 100644 --- a/.ci/tests/integration/cases/java-function/manifests.yaml +++ b/.ci/tests/integration/cases/java-function/manifests.yaml @@ -1,6 +1,8 @@ apiVersion: compute.functionmesh.io/v1alpha1 kind: Function metadata: + annotations: + compute.functionmesh.io/pause-rollout: "true" name: function-sample namespace: default spec: diff --git a/.ci/tests/integration/cases/java-log-config/manifests.yaml b/.ci/tests/integration/cases/java-log-config/manifests.yaml index 1e0c2fb9..5df73fbc 100644 --- a/.ci/tests/integration/cases/java-log-config/manifests.yaml +++ b/.ci/tests/integration/cases/java-log-config/manifests.yaml @@ -1,6 +1,8 @@ apiVersion: compute.functionmesh.io/v1alpha1 kind: Function metadata: + annotations: + compute.functionmesh.io/pause-rollout: "true" name: java-log-config namespace: default spec: diff --git a/.ci/tests/integration/cases/java-log-format-json/manifests.yaml b/.ci/tests/integration/cases/java-log-format-json/manifests.yaml index 6b9cbab1..989fb977 100644 --- a/.ci/tests/integration/cases/java-log-format-json/manifests.yaml +++ b/.ci/tests/integration/cases/java-log-format-json/manifests.yaml @@ -1,6 +1,8 @@ apiVersion: compute.functionmesh.io/v1alpha1 kind: Function metadata: + annotations: + compute.functionmesh.io/pause-rollout: "false" name: java-log-format-json namespace: default spec: diff --git a/api/compute/v1alpha1/function_types.go b/api/compute/v1alpha1/function_types.go index ae4f74a5..a513d620 100644 --- a/api/compute/v1alpha1/function_types.go +++ b/api/compute/v1alpha1/function_types.go @@ -125,6 +125,7 @@ type FunctionStatus struct { ObservedGeneration int64 `json:"observedGeneration,omitempty"` GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"` NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"` + PendingChange string `json:"pendingChange,omitempty"` } // +genclient diff --git a/api/compute/v1alpha1/sink_types.go b/api/compute/v1alpha1/sink_types.go index 91a1fa43..94f3ab9c 100644 --- a/api/compute/v1alpha1/sink_types.go +++ b/api/compute/v1alpha1/sink_types.go @@ -115,6 +115,7 @@ type SinkStatus struct { ObservedGeneration int64 `json:"observedGeneration,omitempty"` GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"` NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"` + PendingChange string `json:"pendingChange,omitempty"` } // +genclient diff --git a/api/compute/v1alpha1/source_types.go b/api/compute/v1alpha1/source_types.go index 0293f442..7de78960 100644 --- a/api/compute/v1alpha1/source_types.go +++ b/api/compute/v1alpha1/source_types.go @@ -120,6 +120,7 @@ type SourceStatus struct { ObservedGeneration int64 `json:"observedGeneration,omitempty"` GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"` NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"` + PendingChange string `json:"pendingChange,omitempty"` } // +genclient diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index a43fd4c6..8e3568ec 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -3846,6 +3846,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index 60c962be..b0c8bae6 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -3565,6 +3565,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml index 7be766c3..fc7595fb 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml @@ -3546,6 +3546,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 9fe9cdc4..94bfefa8 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3824,6 +3824,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index d4e8db48..099c3b3d 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -3543,6 +3543,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index f817da2b..4c85f8c9 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -3524,6 +3524,8 @@ spec: observedGeneration: format: int64 type: integer + pendingChange: + type: string replicas: format: int32 type: integer diff --git a/controllers/function.go b/controllers/function.go index f84bc594..ccb35f57 100644 --- a/controllers/function.go +++ b/controllers/function.go @@ -429,7 +429,15 @@ func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, s if err != nil { return false, err } - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil + needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec) + if needUpdate { + diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet) + if err != nil { + return needUpdate, err + } + function.Status.PendingChange = diff + } + return needUpdate, nil } func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, diff --git a/controllers/function_controller.go b/controllers/function_controller.go index cba15048..4719034f 100644 --- a/controllers/function_controller.go +++ b/controllers/function_controller.go @@ -99,10 +99,24 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) + err = r.ObserveFunctionStatefulSet(ctx, function) if err != nil { return reconcile.Result{}, err } + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(function) && !isNewGeneration { + err = r.Status().Update(ctx, function) + if err != nil { + r.Log.Error(err, "failed to update function status after observing statefulset") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } else { + function.Status.PendingChange = "" + } + err = r.ObserveFunctionService(ctx, function) if err != nil { return reconcile.Result{}, err @@ -130,8 +144,6 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) - err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/sink.go b/controllers/sink.go index 235cedab..31505979 100644 --- a/controllers/sink.go +++ b/controllers/sink.go @@ -425,7 +425,15 @@ func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, state if err != nil { return false, err } - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil + needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec) + if needUpdate { + diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet) + if err != nil { + return needUpdate, err + } + sink.Status.PendingChange = diff + } + return needUpdate, nil } func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool { diff --git a/controllers/sink_controller.go b/controllers/sink_controller.go index 711f362c..039c1611 100644 --- a/controllers/sink_controller.go +++ b/controllers/sink_controller.go @@ -98,10 +98,24 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) + err = r.ObserveSinkStatefulSet(ctx, sink) if err != nil { return reconcile.Result{}, err } + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(sink) && !isNewGeneration { + err = r.Status().Update(ctx, sink) + if err != nil { + r.Log.Error(err, "failed to update sink status after observing statefulset") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } else { + sink.Status.PendingChange = "" + } + err = r.ObserveSinkService(ctx, sink) if err != nil { return reconcile.Result{}, err @@ -129,8 +143,6 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } - isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) - err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/source.go b/controllers/source.go index 08685456..05ffcee8 100644 --- a/controllers/source.go +++ b/controllers/source.go @@ -427,7 +427,15 @@ func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, sta if err != nil { return false, err } - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil + needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec) + if needUpdate { + diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet) + if err != nil { + return needUpdate, err + } + source.Status.PendingChange = diff + } + return needUpdate, nil } func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool { diff --git a/controllers/source_controller.go b/controllers/source_controller.go index 70da00cc..b6185667 100644 --- a/controllers/source_controller.go +++ b/controllers/source_controller.go @@ -98,10 +98,24 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } + isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) + err = r.ObserveSourceStatefulSet(ctx, source) if err != nil { return reconcile.Result{}, err } + // skip reconcile if pauseRollout is set to true and the generation is not increased + if spec.IsPauseRollout(source) && !isNewGeneration { + err = r.Status().Update(ctx, source) + if err != nil { + r.Log.Error(err, "failed to update source status after observing statefulset") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } else { + source.Status.PendingChange = "" + } + err = r.ObserveSourceService(ctx, source) if err != nil { return reconcile.Result{}, err @@ -129,8 +143,6 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, err } - isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) - err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err diff --git a/controllers/spec/common.go b/controllers/spec/common.go index f8fac93c..abe5e34b 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -42,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" @@ -99,6 +100,7 @@ const ( AnnotationPrometheusScrape = "prometheus.io/scrape" AnnotationPrometheusPort = "prometheus.io/port" AnnotationManaged = "compute.functionmesh.io/managed" + AnnotationPauseRollout = "compute.functionmesh.io/pause-rollout" AnnotationNeedCleanup = "compute.functionmesh.io/need-cleanup" // if labels contains below, we think it comes from function-mesh-worker-service @@ -171,6 +173,11 @@ func IsManaged(object metav1.Object) bool { return !exists || managed != "false" } +func IsPauseRollout(object metav1.Object) bool { + pauseRollout, exists := object.GetAnnotations()[AnnotationPauseRollout] + return exists && pauseRollout == "true" +} + func NeedCleanup(object metav1.Object) bool { // don't cleanup if it's managed by function-mesh-worker-service _, exists := object.GetLabels()[LabelPulsarCluster] @@ -509,8 +516,9 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe { return &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromInt32(MetricsPort.ContainerPort), + Path: "/", + Port: intstr.FromInt32(MetricsPort.ContainerPort), + Scheme: corev1.URISchemeHTTP, }, }, InitialDelaySeconds: initialDelay, @@ -2080,7 +2088,7 @@ func CheckIfStatefulSetSpecIsEqual(spec *appsv1.StatefulSetSpec, desiredSpec *ap if !reflect.DeepEqual(container.Command, desiredContainer.Command) || container.Image != desiredContainer.Image || container.ImagePullPolicy != desiredContainer.ImagePullPolicy || - container.LivenessProbe != desiredContainer.LivenessProbe || + !reflect.DeepEqual(container.LivenessProbe, desiredContainer.LivenessProbe) || !reflect.DeepEqual(ports, desiredPorts) || !reflect.DeepEqual(containerEnvFrom, desiredContainerEnvFrom) || !reflect.DeepEqual(container.Resources, desiredContainer.Resources) { @@ -2368,3 +2376,24 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En }, } } + +func CreateDiff(orj, modified *appsv1.StatefulSet) (string, error) { + orjCopy := orj.DeepCopyObject().(*appsv1.StatefulSet) + modifiedCopy := modified.DeepCopyObject().(*appsv1.StatefulSet) + modifiedCopy.Status = orjCopy.Status + modifiedCopy.ObjectMeta = orjCopy.ObjectMeta + + orjData, err := json.Marshal(orjCopy) + if err != nil { + return "", fmt.Errorf("marshal origin %w", err) + } + modifiedData, err := json.Marshal(modifiedCopy) + if err != nil { + return "", fmt.Errorf("marshal modified %w", err) + } + patch, err := strategicpatch.CreateTwoWayMergePatch(orjData, modifiedData, orjCopy) + if err != nil { + return "", fmt.Errorf("create diff %w", err) + } + return string(patch), nil +}