diff --git a/api/v1alpha1/function_types.go b/api/v1alpha1/function_types.go index e32a8e9db..261c35382 100644 --- a/api/v1alpha1/function_types.go +++ b/api/v1alpha1/function_types.go @@ -35,10 +35,10 @@ type FunctionSpec struct { Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` - // +kubebuilder:validation:Required // +kubebuilder:validation:Minimum=1 - Replicas *int32 `json:"replicas"` - + Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Minimum=1 + MinReplicas *int32 `json:"minReplicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. // For complex HPA strategies, please refer to Pod.HPAutoscaler. @@ -94,9 +94,10 @@ type FunctionSpec struct { type FunctionStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - Conditions map[Component]ResourceCondition `json:"conditions"` - Replicas int32 `json:"replicas"` - Selector string `json:"selector"` + Conditions map[Component]ResourceCondition `json:"conditions"` + Replicas int32 `json:"replicas"` + Selector string `json:"selector"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/function_webhook.go b/api/v1alpha1/function_webhook.go index 63b366477..a1f150d57 100644 --- a/api/v1alpha1/function_webhook.go +++ b/api/v1alpha1/function_webhook.go @@ -48,9 +48,19 @@ var _ webhook.Defaulter = &Function{} func (r *Function) Default() { functionlog.Info("default", "name", r.Name) - if r.Spec.Replicas == nil { - r.Spec.Replicas = new(int32) - *r.Spec.Replicas = 1 + if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) { + if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = *r.Spec.MinReplicas + } else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil { + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = *r.Spec.Replicas + } else { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = 1 + } } if r.Spec.AutoAck == nil { @@ -179,7 +189,7 @@ func (r *Function) ValidateCreate() error { allErrs = append(allErrs, fieldErrs...) } - fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/functionmesh_types.go b/api/v1alpha1/functionmesh_types.go index 737107049..1324a5c04 100644 --- a/api/v1alpha1/functionmesh_types.go +++ b/api/v1alpha1/functionmesh_types.go @@ -41,6 +41,7 @@ type FunctionMeshStatus struct { SourceConditions map[string]ResourceCondition `json:"sourceConditions,omitempty"` SinkConditions map[string]ResourceCondition `json:"sinkConditions,omitempty"` FunctionConditions map[string]ResourceCondition `json:"functionConditions,omitempty"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/sink_types.go b/api/v1alpha1/sink_types.go index ab05a190c..18d9cc846 100644 --- a/api/v1alpha1/sink_types.go +++ b/api/v1alpha1/sink_types.go @@ -36,10 +36,10 @@ type SinkSpec struct { Tenant string `json:"tenant,omitempty"` Namespace string `json:"namespace,omitempty"` SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector - // +kubebuilder:validation:Required // +kubebuilder:validation:Minimum=1 - Replicas *int32 `json:"replicas"` - + Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Minimum=1 + MinReplicas *int32 `json:"minReplicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. // For complex HPA strategies, please refer to Pod.HPAutoscaler. @@ -85,9 +85,10 @@ type SinkSpec struct { type SinkStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - Conditions map[Component]ResourceCondition `json:"conditions"` - Replicas int32 `json:"replicas"` - Selector string `json:"selector"` + Conditions map[Component]ResourceCondition `json:"conditions"` + Replicas int32 `json:"replicas"` + Selector string `json:"selector"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/sink_webhook.go b/api/v1alpha1/sink_webhook.go index b835d9b6f..62581268e 100644 --- a/api/v1alpha1/sink_webhook.go +++ b/api/v1alpha1/sink_webhook.go @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Sink{} func (r *Sink) Default() { sinklog.Info("default", "name", r.Name) - if r.Spec.Replicas == nil { - r.Spec.Replicas = new(int32) - *r.Spec.Replicas = 1 + if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) { + if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = *r.Spec.MinReplicas + } else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil { + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = *r.Spec.Replicas + } else { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = 1 + } } if r.Spec.AutoAck == nil { @@ -129,7 +139,7 @@ func (r *Sink) ValidateCreate() error { allErrs = append(allErrs, fieldErrs...) } - fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index 13d3a78f0..415d4272f 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -36,9 +36,10 @@ type SourceSpec struct { Namespace string `json:"namespace,omitempty"` ClusterName string `json:"clusterName,omitempty"` SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector - // +kubebuilder:validation:Required - Replicas *int32 `json:"replicas"` - + // +kubebuilder:validation:Minimum=1 + Replicas *int32 `json:"replicas,omitempty"` + // +kubebuilder:validation:Minimum=1 + MinReplicas *int32 `json:"minReplicas,omitempty"` // MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler // If provided, a default HPA with CPU at average of 80% will be used. // For complex HPA strategies, please refer to Pod.HPAutoscaler. @@ -74,9 +75,10 @@ type SourceSpec struct { type SourceStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file - Conditions map[Component]ResourceCondition `json:"conditions"` - Replicas int32 `json:"replicas"` - Selector string `json:"selector"` + Conditions map[Component]ResourceCondition `json:"conditions"` + Replicas int32 `json:"replicas"` + Selector string `json:"selector"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index d925f964b..00a0a4502 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Source{} func (r *Source) Default() { sourcelog.Info("default", "name", r.Name) - if r.Spec.Replicas == nil { - r.Spec.Replicas = new(int32) - *r.Spec.Replicas = 1 + if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) { + if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = *r.Spec.MinReplicas + } else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil { + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = *r.Spec.Replicas + } else { + r.Spec.Replicas = new(int32) + *r.Spec.Replicas = 1 + r.Spec.MinReplicas = new(int32) + *r.Spec.MinReplicas = 1 + } } if r.Spec.ProcessingGuarantee == "" { @@ -139,7 +149,7 @@ func (r *Source) ValidateCreate() error { allErrs = append(allErrs, fieldErrs...) } - fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas) + fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas) if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } diff --git a/api/v1alpha1/validate.go b/api/v1alpha1/validate.go index ac10c1058..d4cc15eba 100644 --- a/api/v1alpha1/validate.go +++ b/api/v1alpha1/validate.go @@ -91,13 +91,13 @@ func validateGolangRuntime(golang *GoRuntime) []*field.Error { return allErrs } -func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error { +func validateReplicasAndMinReplicasAndMaxReplicas(replicas, minReplicas, maxReplicas *int32) []*field.Error { var allErrs field.ErrorList // TODO: allow 0 replicas, currently hpa's min value has to be 1 - if replicas == nil { - e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil") - allErrs = append(allErrs, e) - } + //if replicas == nil { + // e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil") + // allErrs = append(allErrs, e) + //} if replicas != nil && *replicas <= 0 { e := field.Invalid(field.NewPath("spec").Child("replicas"), *replicas, "replicas cannot be zero or negative") @@ -109,6 +109,23 @@ func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error "maxReplicas must be greater than or equal to replicas") allErrs = append(allErrs, e) } + + if minReplicas != nil && *minReplicas <= 0 { + e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas, "minReplicas cannot be zero or negative") + allErrs = append(allErrs, e) + } + + if minReplicas != nil && replicas != nil && *minReplicas > *replicas { + e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas, + "minReplicas must be less than or equal to replicas") + allErrs = append(allErrs, e) + } + + if minReplicas != nil && maxReplicas != nil && *minReplicas > *maxReplicas { + e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *maxReplicas, + "minReplicas must be less than or equal to maxReplicas") + allErrs = append(allErrs, e) + } return allErrs } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 4cb12669d..433e8ca8e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -321,6 +321,11 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { *out = new(int32) **out = **in } + if in.MinReplicas != nil { + in, out := &in.MinReplicas, &out.MinReplicas + *out = new(int32) + **out = **in + } if in.MaxReplicas != nil { in, out := &in.MaxReplicas, &out.MaxReplicas *out = new(int32) @@ -917,6 +922,11 @@ func (in *SinkSpec) DeepCopyInto(out *SinkSpec) { *out = new(int32) **out = **in } + if in.MinReplicas != nil { + in, out := &in.MinReplicas, &out.MinReplicas + *out = new(int32) + **out = **in + } if in.MaxReplicas != nil { in, out := &in.MaxReplicas, &out.MaxReplicas *out = new(int32) @@ -1051,6 +1061,11 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { *out = new(int32) **out = **in } + if in.MinReplicas != nil { + in, out := &in.MinReplicas, &out.MinReplicas + *out = new(int32) + **out = **in + } if in.MaxReplicas != nil { in, out := &in.MaxReplicas, &out.MaxReplicas *out = new(int32) diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 3698b21a4..537dc49c8 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -201,6 +201,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2981,6 +2985,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -5491,8 +5499,6 @@ spec: - name type: object type: array - required: - - replicas type: object type: array sources: @@ -5581,6 +5587,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -8076,6 +8086,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -8134,8 +8145,6 @@ spec: - name type: object type: array - required: - - replicas type: object type: array type: object @@ -8152,6 +8161,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer sinkConditions: additionalProperties: properties: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index fd9d8d5b6..b0c43fdce 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -220,6 +220,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2847,6 +2851,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index 16215f025..a4468f05b 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -210,6 +210,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2720,8 +2724,6 @@ spec: - name type: object type: array - required: - - replicas type: object status: properties: @@ -2736,6 +2738,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml index 8b9de8053..7894aec99 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml @@ -131,6 +131,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2626,6 +2630,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2684,8 +2689,6 @@ spec: - name type: object type: array - required: - - replicas type: object status: properties: @@ -2700,6 +2703,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index cb61a916a..8042f8d81 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -203,6 +203,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2983,6 +2987,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -5493,8 +5501,6 @@ spec: - name type: object type: array - required: - - replicas type: object type: array sources: @@ -5583,6 +5589,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -8078,6 +8088,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -8136,8 +8147,6 @@ spec: - name type: object type: array - required: - - replicas type: object type: array type: object @@ -8154,6 +8163,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer sinkConditions: additionalProperties: properties: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 238962ea7..d4f7d77ad 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -200,6 +200,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2827,6 +2831,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 450b2b474..6d84a0140 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -190,6 +190,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2700,8 +2704,6 @@ spec: - name type: object type: array - required: - - replicas type: object status: properties: @@ -2716,6 +2718,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index b17d239e2..c209c4759 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -111,6 +111,10 @@ spec: maxReplicas: format: int32 type: integer + minReplicas: + format: int32 + minimum: 1 + type: integer name: type: string namespace: @@ -2606,6 +2610,7 @@ spec: type: object replicas: format: int32 + minimum: 1 type: integer resources: properties: @@ -2664,8 +2669,6 @@ spec: - name type: object type: array - required: - - replicas type: object status: properties: @@ -2680,6 +2683,9 @@ spec: type: string type: object type: object + observedGeneration: + format: int64 + type: integer replicas: format: int32 type: integer diff --git a/controllers/function.go b/controllers/function.go index d079fcae0..64760f29e 100644 --- a/controllers/function.go +++ b/controllers/function.go @@ -19,7 +19,6 @@ package controllers import ( "context" - "reflect" "github.com/streamnative/function-mesh/api/v1alpha1" "github.com/streamnative/function-mesh/controllers/spec" @@ -32,8 +31,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, req ctrl.Request, - function *v1alpha1.Function) error { +func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, function *v1alpha1.Function) error { condition, ok := function.Status.Conditions[v1alpha1.StatefulSet] if !ok { function.Status.Conditions[v1alpha1.StatefulSet] = v1alpha1.ResourceCondition{ @@ -51,7 +49,12 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, req }, statefulSet) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("function is not ready yet...") + r.Log.Info("function statefulSet is not ready yet...", + "namespace", function.Namespace, "name", function.Name, + "statefulSet name", statefulSet.Name) + condition.Status = metav1.ConditionFalse + condition.Action = v1alpha1.Create + function.Status.Conditions[v1alpha1.StatefulSet] = condition return nil } return err @@ -64,7 +67,7 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, req } function.Status.Selector = selector.String() - if *statefulSet.Spec.Replicas != *function.Spec.Replicas || !reflect.DeepEqual(statefulSet.Spec.Template, spec.MakeFunctionStatefulSet(function).Spec.Template) { + if r.checkIfStatefulSetNeedUpdate(statefulSet, function) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update function.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -73,32 +76,36 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, req if statefulSet.Status.ReadyReplicas == *function.Spec.Replicas { condition.Action = v1alpha1.NoAction - condition.Status = metav1.ConditionTrue } else { condition.Action = v1alpha1.Wait } + condition.Status = metav1.ConditionTrue function.Status.Replicas = *statefulSet.Spec.Replicas function.Status.Conditions[v1alpha1.StatefulSet] = condition - return nil } -func (r *FunctionReconciler) ApplyFunctionStatefulSet(ctx context.Context, function *v1alpha1.Function) error { +func (r *FunctionReconciler) ApplyFunctionStatefulSet(ctx context.Context, function *v1alpha1.Function, newGeneration bool) error { + condition := function.Status.Conditions[v1alpha1.StatefulSet] + if condition.Status == metav1.ConditionTrue && !newGeneration { + return nil + } desiredStatefulSet := spec.MakeFunctionStatefulSet(function) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { - // function statefulset mutate logic + // function statefulSet mutate logic desiredStatefulSet.Spec = desiredStatefulSetSpec return nil }); err != nil { - r.Log.Error(err, "error create or update statefulSet workload", "namespace", desiredStatefulSet.Namespace, "name", desiredStatefulSet.Name) + r.Log.Error(err, "error create or update statefulSet workload for function", + "namespace", function.Namespace, "name", function.Name, + "statefulSet name", desiredStatefulSet.Name) return err } return nil } -func (r *FunctionReconciler) ObserveFunctionService(ctx context.Context, req ctrl.Request, - function *v1alpha1.Function) error { +func (r *FunctionReconciler) ObserveFunctionService(ctx context.Context, function *v1alpha1.Function) error { condition, ok := function.Status.Conditions[v1alpha1.Service] if !ok { function.Status.Conditions[v1alpha1.Service] = v1alpha1.ResourceCondition{ @@ -109,17 +116,18 @@ func (r *FunctionReconciler) ObserveFunctionService(ctx context.Context, req ctr return nil } - if condition.Status == metav1.ConditionTrue { - return nil - } - svc := &corev1.Service{} svcName := spec.MakeHeadlessServiceName(spec.MakeFunctionObjectMeta(function).Name) err := r.Get(ctx, types.NamespacedName{Namespace: function.Namespace, Name: svcName}, svc) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("service is not created...", "Name", function.Name, "ServiceName", svcName) + condition.Status = metav1.ConditionFalse + condition.Action = v1alpha1.Create + function.Status.Conditions[v1alpha1.Service] = condition + r.Log.Info("function service is not created...", + "namespace", function.Namespace, "name", function.Name, + "service name", svcName) return nil } return err @@ -131,30 +139,27 @@ func (r *FunctionReconciler) ObserveFunctionService(ctx context.Context, req ctr return nil } -func (r *FunctionReconciler) ApplyFunctionService(ctx context.Context, req ctrl.Request, - function *v1alpha1.Function) error { +func (r *FunctionReconciler) ApplyFunctionService(ctx context.Context, function *v1alpha1.Function, newGeneration bool) error { condition := function.Status.Conditions[v1alpha1.Service] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - svc := spec.MakeFunctionService(function) - if err := r.Create(ctx, svc); err != nil { - r.Log.Error(err, "failed to expose service for function", "name", function.Name) - return err - } - case v1alpha1.Wait: - // do nothing + desiredService := spec.MakeFunctionService(function) + desiredServiceSpec := desiredService.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredService, func() error { + // function service mutate logic + desiredService.Spec = desiredServiceSpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update service for function", + "namespace", function.Namespace, "name", function.Name, + "service name", desiredService.Name) + return err } - return nil } -func (r *FunctionReconciler) ObserveFunctionHPA(ctx context.Context, req ctrl.Request, - function *v1alpha1.Function) error { +func (r *FunctionReconciler) ObserveFunctionHPA(ctx context.Context, function *v1alpha1.Function) error { if function.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil @@ -170,26 +175,23 @@ func (r *FunctionReconciler) ObserveFunctionHPA(ctx context.Context, req ctrl.Re return nil } - if condition.Status == metav1.ConditionTrue { - return nil - } - hpa := &autov2beta2.HorizontalPodAutoscaler{} err := r.Get(ctx, types.NamespacedName{Namespace: function.Namespace, Name: spec.MakeFunctionObjectMeta(function).Name}, hpa) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("hpa is not created for function...", "Name", function.Name) + condition.Status = metav1.ConditionFalse + condition.Action = v1alpha1.Create + function.Status.Conditions[v1alpha1.HPA] = condition + r.Log.Info("hpa is not created for function...", + "namespace", function.Namespace, "name", function.Name, + "hpa name", hpa.Name) return nil } return err } - if hpa.Spec.MaxReplicas != *function.Spec.MaxReplicas || - !reflect.DeepEqual(hpa.Spec.Metrics, function.Spec.Pod.AutoScalingMetrics) || - (function.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior == nil) || - (function.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior != nil && - !reflect.DeepEqual(*hpa.Spec.Behavior, *function.Spec.Pod.AutoScalingBehavior)) { + if r.checkIfHPANeedUpdate(hpa, function) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update function.Status.Conditions[v1alpha1.HPA] = condition @@ -202,56 +204,34 @@ func (r *FunctionReconciler) ObserveFunctionHPA(ctx context.Context, req ctrl.Re return nil } -func (r *FunctionReconciler) ApplyFunctionHPA(ctx context.Context, req ctrl.Request, - function *v1alpha1.Function) error { +func (r *FunctionReconciler) ApplyFunctionHPA(ctx context.Context, function *v1alpha1.Function, newGeneration bool) error { if function.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil } - condition := function.Status.Conditions[v1alpha1.HPA] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - hpa := spec.MakeFunctionHPA(function) - if err := r.Create(ctx, hpa); err != nil { - r.Log.Error(err, "failed to create pod autoscaler for function", "name", function.Name) - return err - } - case v1alpha1.Update: - hpa := &autov2beta2.HorizontalPodAutoscaler{} - err := r.Get(ctx, types.NamespacedName{Namespace: function.Namespace, - Name: spec.MakeFunctionObjectMeta(function).Name}, hpa) - if err != nil { - r.Log.Error(err, "failed to update pod autoscaler for function, cannot find hpa", "name", function.Name) - return err - } - if hpa.Spec.MaxReplicas != *function.Spec.MaxReplicas { - hpa.Spec.MaxReplicas = *function.Spec.MaxReplicas - } - if len(function.Spec.Pod.AutoScalingMetrics) > 0 && !reflect.DeepEqual(hpa.Spec.Metrics, function.Spec.Pod.AutoScalingMetrics) { - hpa.Spec.Metrics = function.Spec.Pod.AutoScalingMetrics - } - if function.Spec.Pod.AutoScalingBehavior != nil { - hpa.Spec.Behavior = function.Spec.Pod.AutoScalingBehavior - } - if len(function.Spec.Pod.BuiltinAutoscaler) > 0 { - metrics := spec.MakeMetricsFromBuiltinHPARules(function.Spec.Pod.BuiltinAutoscaler) - if !reflect.DeepEqual(hpa.Spec.Metrics, metrics) { - hpa.Spec.Metrics = metrics - } - } - if err := r.Update(ctx, hpa); err != nil { - r.Log.Error(err, "failed to update pod autoscaler for function", "name", function.Name) - return err - } - case v1alpha1.Wait, v1alpha1.NoAction: - // do nothing + desiredHPA := spec.MakeFunctionHPA(function) + desiredHPASpec := desiredHPA.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredHPA, func() error { + // function hpa mutate logic + desiredHPA.Spec = desiredHPASpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update hpa for function", + "namespace", function.Namespace, "name", function.Name, + "hpa name", desiredHPA.Name) + return err } - return nil } + +func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, function *v1alpha1.Function) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(function).Spec) +} + +func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2beta2.HorizontalPodAutoscaler, function *v1alpha1.Function) bool { + return !spec.CheckIfHPASpecIsEqual(&hpa.Spec, &spec.MakeFunctionHPA(function).Spec) +} diff --git a/controllers/function_controller.go b/controllers/function_controller.go index eef0a1656..570b187d6 100644 --- a/controllers/function_controller.go +++ b/controllers/function_controller.go @@ -29,7 +29,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -72,15 +74,15 @@ func (r *FunctionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - err = r.ObserveFunctionStatefulSet(ctx, req, function) + err = r.ObserveFunctionStatefulSet(ctx, function) if err != nil { return reconcile.Result{}, err } - err = r.ObserveFunctionService(ctx, req, function) + err = r.ObserveFunctionService(ctx, function) if err != nil { return reconcile.Result{}, err } - err = r.ObserveFunctionHPA(ctx, req, function) + err = r.ObserveFunctionHPA(ctx, function) if err != nil { return reconcile.Result{}, err } @@ -91,26 +93,38 @@ func (r *FunctionReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - err = r.ApplyFunctionStatefulSet(ctx, function) + isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function) + + err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplyFunctionService(ctx, req, function) + err = r.ApplyFunctionService(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplyFunctionHPA(ctx, req, function) + err = r.ApplyFunctionHPA(ctx, function, isNewGeneration) if err != nil { return reconcile.Result{}, err } + function.Status.ObservedGeneration = function.Generation + err = r.Status().Update(ctx, function) + if err != nil { + r.Log.Error(err, "failed to update function status") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } +func (r *FunctionReconciler) checkIfFunctionGenerationsIsIncreased(function *v1alpha1.Function) bool { + return function.Generation != function.Status.ObservedGeneration +} + func (r *FunctionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Function{}). - Owns(&appsv1.StatefulSet{}). + Owns(&appsv1.StatefulSet{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Owns(&corev1.Service{}). Owns(&autov2beta2.HorizontalPodAutoscaler{}). Owns(&corev1.Secret{}). diff --git a/controllers/function_controller_test.go b/controllers/function_controller_test.go index 10aeeb9d2..b1a9a488e 100644 --- a/controllers/function_controller_test.go +++ b/controllers/function_controller_test.go @@ -272,47 +272,32 @@ func createFunction(function *v1alpha1.Function) { log.Info("deleted resource", "namespace", key.Namespace, "name", key.Name, "test", CurrentGinkgoTestDescription().FullTestText) - statefulsets := new(appsv1.StatefulSetList) - err = k8sClient.List(context.Background(), statefulsets, client.InNamespace(function.Namespace)) + statefulset := new(appsv1.StatefulSet) + statefulsetKey := key + statefulsetKey.Name = spec.MakeFunctionObjectMeta(function).Name + err = k8sClient.Get(context.Background(), statefulsetKey, statefulset) Expect(err).Should(BeNil()) - for _, item := range statefulsets.Items { - log.Info("deleting statefulset resource", "namespace", key.Namespace, "name", key.Name, "test", - CurrentGinkgoTestDescription().FullTestText) - Expect(k8sClient.Delete(context.Background(), &item)).To(Succeed()) - } - log.Info("waiting for StatefulSet resource to disappear", "namespace", key.Namespace, "name", key.Name, "test", - CurrentGinkgoTestDescription().FullTestText) + Expect(k8sClient.Delete(context.Background(), statefulset)).To(Succeed()) Eventually(func() bool { - err := k8sClient.List(context.Background(), statefulsets, client.InNamespace(function.Namespace)) - log.Info("delete statefulset result", "err", err, "statefulsets", statefulsets) - containsFunction := false - for _, item := range statefulsets.Items { - if item.Name == function.Name { - containsFunction = true - } - } - return err == nil && !containsFunction + err := k8sClient.Get(context.Background(), statefulsetKey, statefulset) + log.Info("delete statefulset result", "err", err, "statefulset", statefulset) + return err != nil && strings.Contains(err.Error(), "not found") }, timeout, interval).Should(BeTrue()) log.Info("StatefulSet resource deleted", "namespace", key.Namespace, "name", key.Name, "test", CurrentGinkgoTestDescription().FullTestText) - hpas := new(autov2beta2.HorizontalPodAutoscalerList) - err = k8sClient.List(context.Background(), hpas, client.InNamespace(function.Namespace)) + hpa := new(autov2beta2.HorizontalPodAutoscaler) + hpaKey := key + hpaKey.Name = spec.MakeFunctionObjectMeta(function).Name + err = k8sClient.Get(context.Background(), hpaKey, hpa) Expect(err).Should(BeNil()) - for _, item := range hpas.Items { - log.Info("deleting HPA resource", "namespace", key.Namespace, "name", key.Name, "test", - CurrentGinkgoTestDescription().FullTestText) - log.Info("deleting HPA", "item", item) - Expect(k8sClient.Delete(context.Background(), &item)).To(Succeed()) - } - log.Info("waiting for HPA resource to disappear", "namespace", key.Namespace, "name", key.Name, "test", - CurrentGinkgoTestDescription().FullTestText) + Expect(k8sClient.Delete(context.Background(), hpa)).To(Succeed()) Eventually(func() bool { - err := k8sClient.List(context.Background(), hpas, client.InNamespace(function.Namespace)) - return err == nil && len(hpas.Items) == 0 + err := k8sClient.Get(context.Background(), hpaKey, hpa) + log.Info("delete hpa result", "err", err, "hpa", hpa) + return err != nil && strings.Contains(err.Error(), "not found") }, timeout, interval).Should(BeTrue()) log.Info("HPA resource deleted", "namespace", key.Namespace, "name", key.Name, "test", CurrentGinkgoTestDescription().FullTestText) - }) } diff --git a/controllers/function_mesh.go b/controllers/function_mesh.go index c4add6638..821abf6cd 100644 --- a/controllers/function_mesh.go +++ b/controllers/function_mesh.go @@ -226,7 +226,7 @@ func (r *FunctionMeshReconciler) observeSinks(ctx context.Context, mesh *v1alpha } func (r *FunctionMeshReconciler) UpdateFunctionMesh(ctx context.Context, req ctrl.Request, - mesh *v1alpha1.FunctionMesh) error { + mesh *v1alpha1.FunctionMesh, newGeneration bool) error { defer func() { err := r.Status().Update(ctx, mesh) if err != nil { @@ -236,6 +236,12 @@ func (r *FunctionMeshReconciler) UpdateFunctionMesh(ctx context.Context, req ctr for _, functionSpec := range mesh.Spec.Functions { condition := mesh.Status.FunctionConditions[functionSpec.Name] + if !newGeneration && + functionSpec.MaxReplicas != nil && + condition.Status == metav1.ConditionTrue && + condition.Action == v1alpha1.NoAction { + continue + } function := spec.MakeFunctionComponent(makeComponentName(mesh.Name, functionSpec.Name), mesh, &functionSpec) if err := r.CreateOrUpdateFunction(ctx, function, function.Spec); err != nil { r.Log.Error(err, "failed to handle function", "name", functionSpec.Name, "action", condition.Action) @@ -245,6 +251,12 @@ func (r *FunctionMeshReconciler) UpdateFunctionMesh(ctx context.Context, req ctr for _, sourceSpec := range mesh.Spec.Sources { condition := mesh.Status.SourceConditions[sourceSpec.Name] + if !newGeneration && + sourceSpec.MaxReplicas != nil && + condition.Status == metav1.ConditionTrue && + condition.Action == v1alpha1.NoAction { + continue + } source := spec.MakeSourceComponent(makeComponentName(mesh.Name, sourceSpec.Name), mesh, &sourceSpec) if err := r.CreateOrUpdateSource(ctx, source, source.Spec); err != nil { r.Log.Error(err, "failed to handle soure", "name", sourceSpec.Name, "action", condition.Action) @@ -254,6 +266,12 @@ func (r *FunctionMeshReconciler) UpdateFunctionMesh(ctx context.Context, req ctr for _, sinkSpec := range mesh.Spec.Sinks { condition := mesh.Status.SinkConditions[sinkSpec.Name] + if !newGeneration && + sinkSpec.MaxReplicas != nil && + condition.Status == metav1.ConditionTrue && + condition.Action == v1alpha1.NoAction { + continue + } sink := spec.MakeSinkComponent(makeComponentName(mesh.Name, sinkSpec.Name), mesh, &sinkSpec) if err := r.CreateOrUpdateSink(ctx, sink, sink.Spec); err != nil { r.Log.Error(err, "failed to handle sink", "name", sinkSpec.Name, "action", condition.Action) diff --git a/controllers/functionmesh_controller.go b/controllers/functionmesh_controller.go index aa47843a5..2fb008879 100644 --- a/controllers/functionmesh_controller.go +++ b/controllers/functionmesh_controller.go @@ -87,15 +87,27 @@ func (r *FunctionMeshReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error return ctrl.Result{}, err } + isNewGeneration := r.checkIfFunctionMeshGenerationsIsIncreased(mesh) + // apply changes - err = r.UpdateFunctionMesh(ctx, req, mesh) + err = r.UpdateFunctionMesh(ctx, req, mesh, isNewGeneration) if err != nil { return reconcile.Result{}, err } + mesh.Status.ObservedGeneration = mesh.Generation + err = r.Status().Update(ctx, mesh) + if err != nil { + r.Log.Error(err, "failed to update functionmesh status") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } +func (r *FunctionMeshReconciler) checkIfFunctionMeshGenerationsIsIncreased(mesh *v1alpha1.FunctionMesh) bool { + return mesh.Generation != mesh.Status.ObservedGeneration +} + func (r *FunctionMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.FunctionMesh{}). diff --git a/controllers/sink.go b/controllers/sink.go index e75a5deef..e8f5d54c2 100644 --- a/controllers/sink.go +++ b/controllers/sink.go @@ -19,7 +19,6 @@ package controllers import ( "context" - "reflect" "github.com/streamnative/function-mesh/api/v1alpha1" "github.com/streamnative/function-mesh/controllers/spec" @@ -32,12 +31,15 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, req ctrl.Request, - sink *v1alpha1.Sink) error { - condition := v1alpha1.ResourceCondition{ - Condition: v1alpha1.StatefulSetReady, - Status: metav1.ConditionFalse, - Action: v1alpha1.NoAction, +func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, sink *v1alpha1.Sink) error { + condition, ok := sink.Status.Conditions[v1alpha1.StatefulSet] + if !ok { + sink.Status.Conditions[v1alpha1.StatefulSet] = v1alpha1.ResourceCondition{ + Condition: v1alpha1.StatefulSetReady, + Status: metav1.ConditionFalse, + Action: v1alpha1.Create, + } + return nil } statefulSet := &appsv1.StatefulSet{} @@ -47,58 +49,71 @@ func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, req ctrl.Re }, statefulSet) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("sink statefulset is not found...") + r.Log.Info("sink statefulSet is not found...", + "namespace", sink.Namespace, "name", sink.Name, + "statefulSet name", statefulSet.Name) + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Create sink.Status.Conditions[v1alpha1.StatefulSet] = condition return nil } - - sink.Status.Conditions[v1alpha1.StatefulSet] = condition return err } - // statefulset created, waiting it to be ready - condition.Action = v1alpha1.Wait + selector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) + if err != nil { + r.Log.Error(err, "error retrieving statefulSet selector") + return err + } + sink.Status.Selector = selector.String() - if *statefulSet.Spec.Replicas != *sink.Spec.Replicas || !reflect.DeepEqual(statefulSet.Spec.Template, spec.MakeSinkStatefulSet(sink).Spec.Template) { + if r.checkIfStatefulSetNeedUpdate(statefulSet, sink) { + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update + sink.Status.Conditions[v1alpha1.StatefulSet] = condition + return nil } if statefulSet.Status.ReadyReplicas == *sink.Spec.Replicas { - condition.Status = metav1.ConditionTrue + condition.Action = v1alpha1.NoAction + } else { + condition.Action = v1alpha1.Wait } - - sink.Status.Replicas = statefulSet.Status.Replicas + condition.Status = metav1.ConditionTrue + sink.Status.Replicas = *statefulSet.Spec.Replicas sink.Status.Conditions[v1alpha1.StatefulSet] = condition - - selector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) - if err != nil { - r.Log.Error(err, "error retrieving statefulSet selector") - return err - } - sink.Status.Selector = selector.String() return nil } -func (r *SinkReconciler) ApplySinkStatefulSet(ctx context.Context, sink *v1alpha1.Sink) error { +func (r *SinkReconciler) ApplySinkStatefulSet(ctx context.Context, sink *v1alpha1.Sink, newGeneration bool) error { + condition := sink.Status.Conditions[v1alpha1.StatefulSet] + if condition.Status == metav1.ConditionTrue && !newGeneration { + return nil + } desiredStatefulSet := spec.MakeSinkStatefulSet(sink) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { - // sink statefulset mutate logic + // sink statefulSet mutate logic desiredStatefulSet.Spec = desiredStatefulSetSpec return nil }); err != nil { - r.Log.Error(err, "error create or update statefulSet workload", "namespace", desiredStatefulSet.Namespace, "name", desiredStatefulSet.Name) + r.Log.Error(err, "error create or update statefulSet workload for sink", + "namespace", sink.Namespace, "name", sink.Name, + "statefulSet name", desiredStatefulSet.Name) return err } return nil } -func (r *SinkReconciler) ObserveSinkService(ctx context.Context, req ctrl.Request, sink *v1alpha1.Sink) error { - condition := v1alpha1.ResourceCondition{ - Condition: v1alpha1.ServiceReady, - Status: metav1.ConditionFalse, - Action: v1alpha1.NoAction, +func (r *SinkReconciler) ObserveSinkService(ctx context.Context, sink *v1alpha1.Sink) error { + condition, ok := sink.Status.Conditions[v1alpha1.Service] + if !ok { + sink.Status.Conditions[v1alpha1.Service] = v1alpha1.ResourceCondition{ + Condition: v1alpha1.ServiceReady, + Status: metav1.ConditionFalse, + Action: v1alpha1.Create, + } + return nil } svc := &corev1.Service{} @@ -107,43 +122,44 @@ func (r *SinkReconciler) ObserveSinkService(ctx context.Context, req ctrl.Reques Name: svcName}, svc) if err != nil { if errors.IsNotFound(err) { + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Create - r.Log.Info("service is not created...", "Name", sink.Name, "ServiceName", svcName) - } else { sink.Status.Conditions[v1alpha1.Service] = condition - return err + r.Log.Info("sink service is not created...", + "namespace", sink.Namespace, "name", sink.Name, + "service name", svcName) + return nil } - } else { - // service object doesn't have status, so once it's created just consider it's ready - condition.Status = metav1.ConditionTrue + return err } + condition.Action = v1alpha1.NoAction + condition.Status = metav1.ConditionTrue sink.Status.Conditions[v1alpha1.Service] = condition return nil } -func (r *SinkReconciler) ApplySinkService(ctx context.Context, req ctrl.Request, sink *v1alpha1.Sink) error { +func (r *SinkReconciler) ApplySinkService(ctx context.Context, sink *v1alpha1.Sink, newGeneration bool) error { condition := sink.Status.Conditions[v1alpha1.Service] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - svc := spec.MakeSinkService(sink) - if err := r.Create(ctx, svc); err != nil { - r.Log.Error(err, "failed to expose service for sink", "name", sink.Name) - return err - } - case v1alpha1.Wait, v1alpha1.NoAction: - // do nothing + desiredService := spec.MakeSinkService(sink) + desiredServiceSpec := desiredService.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredService, func() error { + // sink service mutate logic + desiredService.Spec = desiredServiceSpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update service for sink", + "namespace", sink.Namespace, "name", sink.Name, + "service name", desiredService.Name) + return err } - return nil } -func (r *SinkReconciler) ObserveSinkHPA(ctx context.Context, req ctrl.Request, sink *v1alpha1.Sink) error { +func (r *SinkReconciler) ObserveSinkHPA(ctx context.Context, sink *v1alpha1.Sink) error { if sink.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil @@ -159,26 +175,23 @@ func (r *SinkReconciler) ObserveSinkHPA(ctx context.Context, req ctrl.Request, s return nil } - if condition.Status == metav1.ConditionTrue { - return nil - } - hpa := &autov2beta2.HorizontalPodAutoscaler{} err := r.Get(ctx, types.NamespacedName{Namespace: sink.Namespace, Name: spec.MakeSinkObjectMeta(sink).Name}, hpa) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("hpa is not created for sink...", "Name", sink.Name) + condition.Status = metav1.ConditionFalse + condition.Action = v1alpha1.Create + sink.Status.Conditions[v1alpha1.HPA] = condition + r.Log.Info("sink hpa is not created for sink...", + "namespace", sink.Namespace, "name", sink.Name, + "hpa name", hpa.Name) return nil } return err } - if hpa.Spec.MaxReplicas != *sink.Spec.MaxReplicas || - !reflect.DeepEqual(hpa.Spec.Metrics, sink.Spec.Pod.AutoScalingMetrics) || - (sink.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior == nil) || - (sink.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior != nil && - !reflect.DeepEqual(*hpa.Spec.Behavior, *sink.Spec.Pod.AutoScalingBehavior)) { + if r.checkIfHPANeedUpdate(hpa, sink) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update sink.Status.Conditions[v1alpha1.HPA] = condition @@ -191,55 +204,34 @@ func (r *SinkReconciler) ObserveSinkHPA(ctx context.Context, req ctrl.Request, s return nil } -func (r *SinkReconciler) ApplySinkHPA(ctx context.Context, req ctrl.Request, sink *v1alpha1.Sink) error { +func (r *SinkReconciler) ApplySinkHPA(ctx context.Context, sink *v1alpha1.Sink, newGeneration bool) error { if sink.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil } - condition := sink.Status.Conditions[v1alpha1.HPA] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - hpa := spec.MakeSinkHPA(sink) - if err := r.Create(ctx, hpa); err != nil { - r.Log.Error(err, "failed to create pod autoscaler for sink", "name", sink.Name) - return err - } - case v1alpha1.Update: - hpa := &autov2beta2.HorizontalPodAutoscaler{} - err := r.Get(ctx, types.NamespacedName{Namespace: sink.Namespace, - Name: spec.MakeSinkObjectMeta(sink).Name}, hpa) - if err != nil { - r.Log.Error(err, "failed to update pod autoscaler for sink, cannot find hpa", "name", sink.Name) - return err - } - if hpa.Spec.MaxReplicas != *sink.Spec.MaxReplicas { - hpa.Spec.MaxReplicas = *sink.Spec.MaxReplicas - } - if len(sink.Spec.Pod.AutoScalingMetrics) > 0 && !reflect.DeepEqual(hpa.Spec.Metrics, sink.Spec.Pod.AutoScalingMetrics) { - hpa.Spec.Metrics = sink.Spec.Pod.AutoScalingMetrics - } - if sink.Spec.Pod.AutoScalingBehavior != nil { - hpa.Spec.Behavior = sink.Spec.Pod.AutoScalingBehavior - } - if len(sink.Spec.Pod.BuiltinAutoscaler) > 0 { - metrics := spec.MakeMetricsFromBuiltinHPARules(sink.Spec.Pod.BuiltinAutoscaler) - if !reflect.DeepEqual(hpa.Spec.Metrics, metrics) { - hpa.Spec.Metrics = metrics - } - } - if err := r.Update(ctx, hpa); err != nil { - r.Log.Error(err, "failed to update pod autoscaler for sink", "name", sink.Name) - return err - } - case v1alpha1.Wait, v1alpha1.NoAction: - // do nothing + desiredHPA := spec.MakeSinkHPA(sink) + desiredHPASpec := desiredHPA.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredHPA, func() error { + // sink hpa mutate logic + desiredHPA.Spec = desiredHPASpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update hpa for sink", + "namespace", sink.Namespace, "name", sink.Name, + "hpa name", desiredHPA.Name) + return err } - return nil } + +func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(sink).Spec) +} + +func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2beta2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool { + return !spec.CheckIfHPASpecIsEqual(&hpa.Spec, &spec.MakeSinkHPA(sink).Spec) +} diff --git a/controllers/sink_controller.go b/controllers/sink_controller.go index 9d5949511..11ca15785 100644 --- a/controllers/sink_controller.go +++ b/controllers/sink_controller.go @@ -20,17 +20,18 @@ package controllers import ( "context" - "github.com/streamnative/function-mesh/controllers/spec" - "github.com/go-logr/logr" - computev1alpha1 "github.com/streamnative/function-mesh/api/v1alpha1" + "github.com/streamnative/function-mesh/api/v1alpha1" + "github.com/streamnative/function-mesh/controllers/spec" appsv1 "k8s.io/api/apps/v1" autov2beta2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -52,7 +53,7 @@ func (r *SinkReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { _ = r.Log.WithValues("sink", req.NamespacedName) // your logic here - sink := &computev1alpha1.Sink{} + sink := &v1alpha1.Sink{} err := r.Get(ctx, req.NamespacedName, sink) if err != nil { if errors.IsNotFound(err) { @@ -68,18 +69,18 @@ func (r *SinkReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { } if sink.Status.Conditions == nil { - sink.Status.Conditions = make(map[computev1alpha1.Component]computev1alpha1.ResourceCondition) + sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - err = r.ObserveSinkStatefulSet(ctx, req, sink) + err = r.ObserveSinkStatefulSet(ctx, sink) if err != nil { return reconcile.Result{}, err } - err = r.ObserveSinkService(ctx, req, sink) + err = r.ObserveSinkService(ctx, sink) if err != nil { return reconcile.Result{}, err } - err = r.ObserveSinkHPA(ctx, req, sink) + err = r.ObserveSinkHPA(ctx, sink) if err != nil { return reconcile.Result{}, err } @@ -90,26 +91,38 @@ func (r *SinkReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - err = r.ApplySinkStatefulSet(ctx, sink) + isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink) + + err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplySinkService(ctx, req, sink) + err = r.ApplySinkService(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplySinkHPA(ctx, req, sink) + err = r.ApplySinkHPA(ctx, sink, isNewGeneration) if err != nil { return reconcile.Result{}, err } + sink.Status.ObservedGeneration = sink.Generation + err = r.Status().Update(ctx, sink) + if err != nil { + r.Log.Error(err, "failed to update sink status") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } +func (r *SinkReconciler) checkIfSinkGenerationsIsIncreased(sink *v1alpha1.Sink) bool { + return sink.Generation != sink.Status.ObservedGeneration +} + func (r *SinkReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha1.Sink{}). - Owns(&appsv1.StatefulSet{}). + For(&v1alpha1.Sink{}). + Owns(&appsv1.StatefulSet{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Owns(&corev1.Service{}). Owns(&autov2beta2.HorizontalPodAutoscaler{}). Complete(r) diff --git a/controllers/source.go b/controllers/source.go index a190e0816..9bc936bab 100644 --- a/controllers/source.go +++ b/controllers/source.go @@ -19,7 +19,6 @@ package controllers import ( "context" - "reflect" "github.com/streamnative/function-mesh/api/v1alpha1" "github.com/streamnative/function-mesh/controllers/spec" @@ -32,12 +31,15 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, req ctrl.Request, - source *v1alpha1.Source) error { - condition := v1alpha1.ResourceCondition{ - Condition: v1alpha1.StatefulSetReady, - Status: metav1.ConditionFalse, - Action: v1alpha1.NoAction, +func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, source *v1alpha1.Source) error { + condition, ok := source.Status.Conditions[v1alpha1.StatefulSet] + if !ok { + source.Status.Conditions[v1alpha1.StatefulSet] = v1alpha1.ResourceCondition{ + Condition: v1alpha1.StatefulSetReady, + Status: metav1.ConditionFalse, + Action: v1alpha1.Create, + } + return nil } statefulSet := &appsv1.StatefulSet{} @@ -47,58 +49,71 @@ func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, req ctr }, statefulSet) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("source statefulset is not found...") + r.Log.Info("source statefulSet is not ready yet...", + "namespace", source.Namespace, "name", source.Name, + "statefulSet name", statefulSet.Name) + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Create source.Status.Conditions[v1alpha1.StatefulSet] = condition return nil } - - source.Status.Conditions[v1alpha1.StatefulSet] = condition return err } - // statefulset created, waiting it to be ready - condition.Action = v1alpha1.Wait + selector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) + if err != nil { + r.Log.Error(err, "error retrieving statefulSet selector") + return err + } + source.Status.Selector = selector.String() - if *statefulSet.Spec.Replicas != *source.Spec.Replicas || !reflect.DeepEqual(statefulSet.Spec.Template, spec.MakeSourceStatefulSet(source).Spec.Template) { + if r.checkIfStatefulSetNeedUpdate(statefulSet, source) { + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update + source.Status.Conditions[v1alpha1.StatefulSet] = condition + return nil } if statefulSet.Status.ReadyReplicas == *source.Spec.Replicas { - condition.Status = metav1.ConditionTrue + condition.Action = v1alpha1.NoAction + } else { + condition.Action = v1alpha1.Wait } - - source.Status.Replicas = statefulSet.Status.Replicas + condition.Status = metav1.ConditionTrue + source.Status.Replicas = *statefulSet.Spec.Replicas source.Status.Conditions[v1alpha1.StatefulSet] = condition - - selector, err := metav1.LabelSelectorAsSelector(statefulSet.Spec.Selector) - if err != nil { - r.Log.Error(err, "error retrieving statefulSet selector") - return err - } - source.Status.Selector = selector.String() return nil } -func (r *SourceReconciler) ApplySourceStatefulSet(ctx context.Context, source *v1alpha1.Source) error { +func (r *SourceReconciler) ApplySourceStatefulSet(ctx context.Context, source *v1alpha1.Source, newGeneration bool) error { + condition := source.Status.Conditions[v1alpha1.StatefulSet] + if condition.Status == metav1.ConditionTrue && !newGeneration { + return nil + } desiredStatefulSet := spec.MakeSourceStatefulSet(source) desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { - // source statefulset mutate logic + // source statefulSet mutate logic desiredStatefulSet.Spec = desiredStatefulSetSpec return nil }); err != nil { - r.Log.Error(err, "error create or update statefulSet workload", "namespace", desiredStatefulSet.Namespace, "name", desiredStatefulSet.Name) + r.Log.Error(err, "error create or update statefulSet workload for source", + "namespace", source.Namespace, "name", source.Name, + "statefulSet name", desiredStatefulSet.Name) return err } return nil } -func (r *SourceReconciler) ObserveSourceService(ctx context.Context, req ctrl.Request, source *v1alpha1.Source) error { - condition := v1alpha1.ResourceCondition{ - Condition: v1alpha1.ServiceReady, - Status: metav1.ConditionFalse, - Action: v1alpha1.NoAction, +func (r *SourceReconciler) ObserveSourceService(ctx context.Context, source *v1alpha1.Source) error { + condition, ok := source.Status.Conditions[v1alpha1.Service] + if !ok { + source.Status.Conditions[v1alpha1.Service] = v1alpha1.ResourceCondition{ + Condition: v1alpha1.ServiceReady, + Status: metav1.ConditionFalse, + Action: v1alpha1.Create, + } + return nil } svc := &corev1.Service{} @@ -107,43 +122,44 @@ func (r *SourceReconciler) ObserveSourceService(ctx context.Context, req ctrl.Re Name: svcName}, svc) if err != nil { if errors.IsNotFound(err) { + condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Create - r.Log.Info("service is not created...", "Name", source.Name, "ServiceName", svcName) - } else { source.Status.Conditions[v1alpha1.Service] = condition - return err + r.Log.Info("source service is not created...", + "namespace", source.Namespace, "name", source.Name, + "service name", svcName) + return nil } - } else { - // service object doesn't have status, so once it's created just consider it's ready - condition.Status = metav1.ConditionTrue + return err } + condition.Action = v1alpha1.NoAction + condition.Status = metav1.ConditionTrue source.Status.Conditions[v1alpha1.Service] = condition return nil } -func (r *SourceReconciler) ApplySourceService(ctx context.Context, req ctrl.Request, source *v1alpha1.Source) error { +func (r *SourceReconciler) ApplySourceService(ctx context.Context, source *v1alpha1.Source, newGeneration bool) error { condition := source.Status.Conditions[v1alpha1.Service] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - svc := spec.MakeSourceService(source) - if err := r.Create(ctx, svc); err != nil { - r.Log.Error(err, "failed to expose service for source", "name", source.Name) - return err - } - case v1alpha1.Wait, v1alpha1.NoAction: - // do nothing + desiredService := spec.MakeSourceService(source) + desiredServiceSpec := desiredService.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredService, func() error { + // source service mutate logic + desiredService.Spec = desiredServiceSpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update service for source", + "namespace", source.Namespace, "name", source.Name, + "service name", desiredService.Name) + return err } - return nil } -func (r *SourceReconciler) ObserveSourceHPA(ctx context.Context, req ctrl.Request, source *v1alpha1.Source) error { +func (r *SourceReconciler) ObserveSourceHPA(ctx context.Context, source *v1alpha1.Source) error { if source.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil @@ -164,17 +180,18 @@ func (r *SourceReconciler) ObserveSourceHPA(ctx context.Context, req ctrl.Reques Name: spec.MakeSourceObjectMeta(source).Name}, hpa) if err != nil { if errors.IsNotFound(err) { - r.Log.Info("hpa is not created for source...", "Name", source.Name) + condition.Status = metav1.ConditionFalse + condition.Action = v1alpha1.Create + source.Status.Conditions[v1alpha1.HPA] = condition + r.Log.Info("hpa is not created for source...", + "namespace", source.Namespace, "name", source.Name, + "hpa name", hpa.Name) return nil } return err } - if hpa.Spec.MaxReplicas != *source.Spec.MaxReplicas || - !reflect.DeepEqual(hpa.Spec.Metrics, source.Spec.Pod.AutoScalingMetrics) || - (source.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior == nil) || - (source.Spec.Pod.AutoScalingBehavior != nil && hpa.Spec.Behavior != nil && - !reflect.DeepEqual(*hpa.Spec.Behavior, *source.Spec.Pod.AutoScalingBehavior)) { + if r.checkIfHPANeedUpdate(hpa, source) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update source.Status.Conditions[v1alpha1.HPA] = condition @@ -187,55 +204,34 @@ func (r *SourceReconciler) ObserveSourceHPA(ctx context.Context, req ctrl.Reques return nil } -func (r *SourceReconciler) ApplySourceHPA(ctx context.Context, req ctrl.Request, source *v1alpha1.Source) error { +func (r *SourceReconciler) ApplySourceHPA(ctx context.Context, source *v1alpha1.Source, newGeneration bool) error { if source.Spec.MaxReplicas == nil { // HPA not enabled, skip further action return nil } - condition := source.Status.Conditions[v1alpha1.HPA] - - if condition.Status == metav1.ConditionTrue { + if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - - switch condition.Action { - case v1alpha1.Create: - hpa := spec.MakeSourceHPA(source) - if err := r.Create(ctx, hpa); err != nil { - r.Log.Error(err, "failed to create pod autoscaler for source", "name", source.Name) - return err - } - case v1alpha1.Update: - hpa := &autov2beta2.HorizontalPodAutoscaler{} - err := r.Get(ctx, types.NamespacedName{Namespace: source.Namespace, - Name: spec.MakeSourceObjectMeta(source).Name}, hpa) - if err != nil { - r.Log.Error(err, "failed to update pod autoscaler for source, cannot find hpa", "name", source.Name) - return err - } - if hpa.Spec.MaxReplicas != *source.Spec.MaxReplicas { - hpa.Spec.MaxReplicas = *source.Spec.MaxReplicas - } - if len(source.Spec.Pod.AutoScalingMetrics) > 0 && !reflect.DeepEqual(hpa.Spec.Metrics, source.Spec.Pod.AutoScalingMetrics) { - hpa.Spec.Metrics = source.Spec.Pod.AutoScalingMetrics - } - if source.Spec.Pod.AutoScalingBehavior != nil { - hpa.Spec.Behavior = source.Spec.Pod.AutoScalingBehavior - } - if len(source.Spec.Pod.BuiltinAutoscaler) > 0 { - metrics := spec.MakeMetricsFromBuiltinHPARules(source.Spec.Pod.BuiltinAutoscaler) - if !reflect.DeepEqual(hpa.Spec.Metrics, metrics) { - hpa.Spec.Metrics = metrics - } - } - if err := r.Update(ctx, hpa); err != nil { - r.Log.Error(err, "failed to update pod autoscaler for source", "name", source.Name) - return err - } - case v1alpha1.Wait, v1alpha1.NoAction: - // do nothing + desiredHPA := spec.MakeSourceHPA(source) + desiredHPASpec := desiredHPA.Spec + if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredHPA, func() error { + // source hpa mutate logic + desiredHPA.Spec = desiredHPASpec + return nil + }); err != nil { + r.Log.Error(err, "error create or update hpa for source", + "namespace", source.Namespace, "name", source.Name, + "hpa name", desiredHPA.Name) + return err } - return nil } + +func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(source).Spec) +} + +func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2beta2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool { + return !spec.CheckIfHPASpecIsEqual(&hpa.Spec, &spec.MakeSourceHPA(source).Spec) +} diff --git a/controllers/source_controller.go b/controllers/source_controller.go index ace90f89d..3622db862 100644 --- a/controllers/source_controller.go +++ b/controllers/source_controller.go @@ -20,17 +20,18 @@ package controllers import ( "context" - "github.com/streamnative/function-mesh/controllers/spec" - "github.com/go-logr/logr" - computev1alpha1 "github.com/streamnative/function-mesh/api/v1alpha1" + "github.com/streamnative/function-mesh/api/v1alpha1" + "github.com/streamnative/function-mesh/controllers/spec" appsv1 "k8s.io/api/apps/v1" autov2beta2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -52,7 +53,7 @@ func (r *SourceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { _ = r.Log.WithValues("source", req.NamespacedName) // your logic here - source := &computev1alpha1.Source{} + source := &v1alpha1.Source{} err := r.Get(ctx, req.NamespacedName, source) if err != nil { if errors.IsNotFound(err) { @@ -68,18 +69,18 @@ func (r *SourceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { } if source.Status.Conditions == nil { - source.Status.Conditions = make(map[computev1alpha1.Component]computev1alpha1.ResourceCondition) + source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - err = r.ObserveSourceStatefulSet(ctx, req, source) + err = r.ObserveSourceStatefulSet(ctx, source) if err != nil { return reconcile.Result{}, err } - err = r.ObserveSourceService(ctx, req, source) + err = r.ObserveSourceService(ctx, source) if err != nil { return reconcile.Result{}, err } - err = r.ObserveSourceHPA(ctx, req, source) + err = r.ObserveSourceHPA(ctx, source) if err != nil { return reconcile.Result{}, err } @@ -90,26 +91,38 @@ func (r *SourceReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - err = r.ApplySourceStatefulSet(ctx, source) + isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source) + + err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplySourceService(ctx, req, source) + err = r.ApplySourceService(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err } - err = r.ApplySourceHPA(ctx, req, source) + err = r.ApplySourceHPA(ctx, source, isNewGeneration) if err != nil { return reconcile.Result{}, err } + source.Status.ObservedGeneration = source.Generation + err = r.Status().Update(ctx, source) + if err != nil { + r.Log.Error(err, "failed to update source status") + return ctrl.Result{}, err + } return ctrl.Result{}, nil } +func (r *SourceReconciler) checkIfSourceGenerationsIsIncreased(source *v1alpha1.Source) bool { + return source.Generation != source.Status.ObservedGeneration +} + func (r *SourceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&computev1alpha1.Source{}). - Owns(&appsv1.StatefulSet{}). + For(&v1alpha1.Source{}). + Owns(&appsv1.StatefulSet{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Owns(&corev1.Service{}). Owns(&autov2beta2.HorizontalPodAutoscaler{}). Complete(r) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 440e83940..acb049025 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -21,13 +21,14 @@ import ( "encoding/json" "fmt" "reflect" + "sort" "strconv" "strings" "github.com/streamnative/function-mesh/api/v1alpha1" "github.com/streamnative/function-mesh/controllers/proto" - appsv1 "k8s.io/api/apps/v1" + autov2beta2 "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1086,3 +1087,137 @@ func getRuntimeLogConfigNames(java *v1alpha1.JavaRuntime, python *v1alpha1.Pytho } return logConfMap } + +func CheckIfStatefulSetSpecIsEqual(spec *appsv1.StatefulSetSpec, desiredSpec *appsv1.StatefulSetSpec) bool { + if *spec.Replicas != *desiredSpec.Replicas { + return false + } + + if len(spec.Template.Spec.Containers) != len(desiredSpec.Template.Spec.Containers) { + return false + } + + for _, container := range spec.Template.Spec.Containers { + containerMatch := false + for _, desiredContainer := range desiredSpec.Template.Spec.Containers { + if container.Name == desiredContainer.Name { + containerMatch = true + // sort container ports + ports := container.Ports + desiredPorts := desiredContainer.Ports + sort.Slice(ports, func(i, j int) bool { + return ports[i].Name < ports[j].Name + }) + sort.Slice(desiredPorts, func(i, j int) bool { + return desiredPorts[i].Name < desiredPorts[j].Name + }) + // sort container envFrom + containerEnvFrom := container.EnvFrom + desiredContainerEnvFrom := desiredContainer.EnvFrom + sort.Slice(containerEnvFrom, func(i, j int) bool { + return containerEnvFrom[i].Prefix < containerEnvFrom[j].Prefix + }) + sort.Slice(desiredContainerEnvFrom, func(i, j int) bool { + return desiredContainerEnvFrom[i].Prefix < desiredContainerEnvFrom[j].Prefix + }) + + if !reflect.DeepEqual(container.Command, desiredContainer.Command) || + container.Image != desiredContainer.Image || + container.ImagePullPolicy != desiredContainer.ImagePullPolicy || + !reflect.DeepEqual(ports, desiredPorts) || + !reflect.DeepEqual(containerEnvFrom, desiredContainerEnvFrom) || + !reflect.DeepEqual(container.Resources, desiredContainer.Resources) { + return false + } + + if len(container.Env) != len(desiredContainer.Env) { + return false + } + } + } + if !containerMatch { + return false + } + } + return true +} + +func CheckIfHPASpecIsEqual(spec *autov2beta2.HorizontalPodAutoscalerSpec, desiredSpec *autov2beta2.HorizontalPodAutoscalerSpec) bool { + if spec.MaxReplicas != desiredSpec.MaxReplicas || *spec.MinReplicas != *desiredSpec.MinReplicas { + return false + } + if !reflect.DeepEqual(spec.Metrics, desiredSpec.Metrics) { + return false + } + if objectXOROperator(spec.Behavior, desiredSpec.Behavior) { + return false + } + if desiredSpec.Behavior != nil { + if objectXOROperator(spec.Behavior.ScaleUp, desiredSpec.Behavior.ScaleUp) || + objectXOROperator(spec.Behavior.ScaleDown, desiredSpec.Behavior.ScaleDown) { + return false + } + if desiredSpec.Behavior.ScaleUp != nil { + if objectXOROperator(spec.Behavior.ScaleUp.StabilizationWindowSeconds, desiredSpec.Behavior.ScaleUp.StabilizationWindowSeconds) || + objectXOROperator(spec.Behavior.ScaleUp.SelectPolicy, desiredSpec.Behavior.ScaleUp.SelectPolicy) || + objectXOROperator(spec.Behavior.ScaleUp.Policies, desiredSpec.Behavior.ScaleUp.Policies) { + return false + } + if desiredSpec.Behavior.ScaleUp.StabilizationWindowSeconds != nil && *desiredSpec.Behavior.ScaleUp.StabilizationWindowSeconds != *spec.Behavior.ScaleUp.StabilizationWindowSeconds { + return false + } + if desiredSpec.Behavior.ScaleUp.SelectPolicy != nil && *desiredSpec.Behavior.ScaleUp.SelectPolicy != *spec.Behavior.ScaleUp.SelectPolicy { + return false + } + // sort policies + desiredPolicies := desiredSpec.Behavior.ScaleUp.Policies + specPolicies := spec.Behavior.ScaleUp.Policies + sort.Slice(desiredPolicies, func(i, j int) bool { + return desiredPolicies[i].Type < desiredPolicies[j].Type + }) + sort.Slice(specPolicies, func(i, j int) bool { + return specPolicies[i].Type < specPolicies[j].Type + }) + if !reflect.DeepEqual(desiredPolicies, specPolicies) { + return false + } + } + if desiredSpec.Behavior.ScaleDown != nil { + if objectXOROperator(spec.Behavior.ScaleDown.StabilizationWindowSeconds, desiredSpec.Behavior.ScaleDown.StabilizationWindowSeconds) || + objectXOROperator(spec.Behavior.ScaleDown.SelectPolicy, desiredSpec.Behavior.ScaleDown.SelectPolicy) || + objectXOROperator(spec.Behavior.ScaleDown.Policies, desiredSpec.Behavior.ScaleDown.Policies) { + return false + } + if desiredSpec.Behavior.ScaleDown.StabilizationWindowSeconds != nil && *desiredSpec.Behavior.ScaleDown.StabilizationWindowSeconds != *spec.Behavior.ScaleDown.StabilizationWindowSeconds { + return false + } + if desiredSpec.Behavior.ScaleDown.SelectPolicy != nil && *desiredSpec.Behavior.ScaleDown.SelectPolicy != *spec.Behavior.ScaleDown.SelectPolicy { + return false + } + // sort policies + desiredPolicies := desiredSpec.Behavior.ScaleDown.Policies + specPolicies := spec.Behavior.ScaleDown.Policies + sort.Slice(desiredPolicies, func(i, j int) bool { + return desiredPolicies[i].Type < desiredPolicies[j].Type + }) + sort.Slice(specPolicies, func(i, j int) bool { + return specPolicies[i].Type < specPolicies[j].Type + }) + if !reflect.DeepEqual(desiredPolicies, specPolicies) { + return false + } + } + } + return true +} + +func objectXOROperator(first interface{}, second interface{}) bool { + return (first != nil && second == nil) || (first == nil && second != nil) +} + +func GetNamespacedName(object metav1.Object, component string) string { + if len(object.GetNamespace()) > 0 { + return component + "/" + object.GetNamespace() + "/" + object.GetName() + } + return component + "/" + object.GetName() +} diff --git a/controllers/spec/common_test.go b/controllers/spec/common_test.go index c72b7d5dd..41db19d14 100644 --- a/controllers/spec/common_test.go +++ b/controllers/spec/common_test.go @@ -209,6 +209,7 @@ func makeSampleObjectMeta(name string) *metav1.ObjectMeta { func makeGoFunctionSample(functionName string) *v1alpha1.Function { maxPending := int32(1000) replicas := int32(1) + minReplicas := int32(1) maxReplicas := int32(5) trueVal := true return &v1alpha1.Function{ @@ -234,6 +235,7 @@ func makeGoFunctionSample(functionName string) *v1alpha1.Function { MaxMessageRetry: 0, ForwardSourceMessageProperty: &trueVal, Replicas: &replicas, + MinReplicas: &minReplicas, MaxReplicas: &maxReplicas, AutoAck: &trueVal, MaxPendingAsyncRequests: &maxPending, diff --git a/controllers/spec/function.go b/controllers/spec/function.go index e7395fbdf..407446f04 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -37,13 +37,13 @@ func MakeFunctionHPA(function *v1alpha1.Function) *autov2beta2.HorizontalPodAuto Name: function.Name, APIVersion: function.APIVersion, } - if isBuiltinHPAEnabled(function.Spec.Replicas, function.Spec.MaxReplicas, function.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *function.Spec.Replicas, *function.Spec.MaxReplicas, targetRef, + if isBuiltinHPAEnabled(function.Spec.MinReplicas, function.Spec.MaxReplicas, function.Spec.Pod) { + return makeBuiltinHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, targetRef, function.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(function.Spec.Replicas, function.Spec.MaxReplicas, function.Spec.Pod) { - return makeHPA(objectMeta, *function.Spec.Replicas, *function.Spec.MaxReplicas, function.Spec.Pod, targetRef) + } else if !isDefaultHPAEnabled(function.Spec.MinReplicas, function.Spec.MaxReplicas, function.Spec.Pod) { + return makeHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, function.Spec.Pod, targetRef) } - return makeDefaultHPA(objectMeta, *function.Spec.Replicas, *function.Spec.MaxReplicas, targetRef) + return makeDefaultHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, targetRef) } func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index 0a97d12e1..dd722634c 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -40,6 +40,7 @@ func TestCreateFunctionDetailsForStatefulFunction(t *testing.T) { func makeFunctionSample(functionName string) *v1alpha1.Function { maxPending := int32(1000) replicas := int32(1) + minReplicas := int32(1) maxReplicas := int32(5) trueVal := true return &v1alpha1.Function{ @@ -68,6 +69,7 @@ func makeFunctionSample(functionName string) *v1alpha1.Function { MaxMessageRetry: 0, ForwardSourceMessageProperty: &trueVal, Replicas: &replicas, + MinReplicas: &minReplicas, MaxReplicas: &maxReplicas, AutoAck: &trueVal, MaxPendingAsyncRequests: &maxPending, diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index e9c1563c8..b997979e6 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -33,13 +33,13 @@ func MakeSinkHPA(sink *v1alpha1.Sink) *autov2beta2.HorizontalPodAutoscaler { Name: sink.Name, APIVersion: sink.APIVersion, } - if isBuiltinHPAEnabled(sink.Spec.Replicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *sink.Spec.Replicas, *sink.Spec.MaxReplicas, targetRef, + if isBuiltinHPAEnabled(sink.Spec.MinReplicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { + return makeBuiltinHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, targetRef, sink.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(sink.Spec.Replicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { - return makeHPA(objectMeta, *sink.Spec.Replicas, *sink.Spec.MaxReplicas, sink.Spec.Pod, targetRef) + } else if !isDefaultHPAEnabled(sink.Spec.MinReplicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { + return makeHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, sink.Spec.Pod, targetRef) } - return makeDefaultHPA(objectMeta, *sink.Spec.Replicas, *sink.Spec.MaxReplicas, targetRef) + return makeDefaultHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, targetRef) } func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { diff --git a/controllers/spec/source.go b/controllers/spec/source.go index 099cce5d7..766b196d0 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -33,13 +33,13 @@ func MakeSourceHPA(source *v1alpha1.Source) *autov2beta2.HorizontalPodAutoscaler Name: source.Name, APIVersion: source.APIVersion, } - if isBuiltinHPAEnabled(source.Spec.Replicas, source.Spec.MaxReplicas, source.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *source.Spec.Replicas, *source.Spec.MaxReplicas, targetRef, + if isBuiltinHPAEnabled(source.Spec.MinReplicas, source.Spec.MaxReplicas, source.Spec.Pod) { + return makeBuiltinHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, targetRef, source.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(source.Spec.Replicas, source.Spec.MaxReplicas, source.Spec.Pod) { - return makeHPA(objectMeta, *source.Spec.Replicas, *source.Spec.MaxReplicas, source.Spec.Pod, targetRef) + } else if !isDefaultHPAEnabled(source.Spec.MinReplicas, source.Spec.MaxReplicas, source.Spec.Pod) { + return makeHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, source.Spec.Pod, targetRef) } - return makeDefaultHPA(objectMeta, *source.Spec.Replicas, *source.Spec.MaxReplicas, targetRef) + return makeDefaultHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, targetRef) } func MakeSourceService(source *v1alpha1.Source) *corev1.Service { diff --git a/controllers/test_utils_test.go b/controllers/test_utils_test.go index f64d88a8c..d605d3506 100644 --- a/controllers/test_utils_test.go +++ b/controllers/test_utils_test.go @@ -62,6 +62,7 @@ func makeSamplePulsarConfig() *v1.ConfigMap { func makeFunctionSample(functionName string) *v1alpha1.Function { maxPending := int32(1000) replicas := int32(1) + minReplicas := int32(1) maxReplicas := int32(5) trueVal := true return &v1alpha1.Function{ @@ -90,6 +91,7 @@ func makeFunctionSample(functionName string) *v1alpha1.Function { MaxMessageRetry: 0, ForwardSourceMessageProperty: &trueVal, Replicas: &replicas, + MinReplicas: &minReplicas, MaxReplicas: &maxReplicas, AutoAck: &trueVal, MaxPendingAsyncRequests: &maxPending, @@ -183,6 +185,7 @@ func makeFunctionMeshSample() *v1alpha1.FunctionMesh { func makeSinkSample() *v1alpha1.Sink { replicas := int32(1) + minReplicas := int32(1) maxReplicas := int32(1) trueVal := true sinkConfig := v1alpha1.NewConfig(map[string]interface{}{ @@ -213,6 +216,7 @@ func makeSinkSample() *v1alpha1.Sink { Timeout: 0, MaxMessageRetry: 0, Replicas: &replicas, + MinReplicas: &minReplicas, MaxReplicas: &maxReplicas, AutoAck: &trueVal, Messaging: v1alpha1.Messaging{ @@ -234,6 +238,7 @@ func makeSinkSample() *v1alpha1.Sink { func makeSourceSample() *v1alpha1.Source { replicas := int32(1) + minReplicas := int32(1) maxReplicas := int32(1) sourceConfig := v1alpha1.NewConfig(map[string]interface{}{ "mongodb.hosts": "rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017", @@ -266,6 +271,7 @@ func makeSourceSample() *v1alpha1.Source { }, SourceConfig: &sourceConfig, Replicas: &replicas, + MinReplicas: &minReplicas, MaxReplicas: &maxReplicas, Messaging: v1alpha1.Messaging{ Pulsar: &v1alpha1.PulsarMessaging{