Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve the stability of autoscaling when HPA is enabled #450

Merged
merged 7 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type FunctionSpec struct {
Tenant string `json:"tenant,omitempty"`
Namespace string `json:"namespace,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas"`

Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -93,9 +93,10 @@ type FunctionSpec struct {
type FunctionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/function_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ var _ webhook.Defaulter = &Function{}
func (r *Function) Default() {
functionlog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.AutoAck == nil {
Expand Down Expand Up @@ -163,7 +173,7 @@ func (r *Function) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/functionmesh_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type FunctionMeshStatus struct {
SourceConditions map[string]ResourceCondition `json:"sourceConditions,omitempty"`
SinkConditions map[string]ResourceCondition `json:"sinkConditions,omitempty"`
FunctionConditions map[string]ResourceCondition `json:"functionConditions,omitempty"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
13 changes: 7 additions & 6 deletions api/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type SinkSpec struct {
Tenant string `json:"tenant,omitempty"`
Namespace string `json:"namespace,omitempty"`
SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas"`

Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -85,9 +85,10 @@ type SinkSpec struct {
type SinkStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Sink{}
func (r *Sink) Default() {
sinklog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will FM still have the HPA error since the replicas are set to 1 even user doesn't set it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, spec.replicas becomes optional, so that the user does not need to configure this value (as in the argo template), but only needs to configure a minimum value of spec.minReplicas.

If spec.replicas is changed under HPA control, applying the template file again (without spec.replicas configured in it) will not trigger reconciliation.

*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.AutoAck == nil {
Expand Down Expand Up @@ -129,7 +139,7 @@ func (r *Sink) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
14 changes: 8 additions & 6 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type SourceSpec struct {
Namespace string `json:"namespace,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector
// +kubebuilder:validation:Required
Replicas *int32 `json:"replicas"`

// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -74,9 +75,10 @@ type SourceSpec struct {
type SourceStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/source_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Source{}
func (r *Source) Default() {
sourcelog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.ProcessingGuarantee == "" {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (r *Source) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
27 changes: 22 additions & 5 deletions api/v1alpha1/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func validateGolangRuntime(golang *GoRuntime) []*field.Error {
return allErrs
}

func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error {
func validateReplicasAndMinReplicasAndMaxReplicas(replicas, minReplicas, maxReplicas *int32) []*field.Error {
var allErrs field.ErrorList
// TODO: allow 0 replicas, currently hpa's min value has to be 1
if replicas == nil {
e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil")
allErrs = append(allErrs, e)
}
//if replicas == nil {
// e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil")
// allErrs = append(allErrs, e)
//}

if replicas != nil && *replicas <= 0 {
e := field.Invalid(field.NewPath("spec").Child("replicas"), *replicas, "replicas cannot be zero or negative")
Expand All @@ -109,6 +109,23 @@ func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error
"maxReplicas must be greater than or equal to replicas")
allErrs = append(allErrs, e)
}

if minReplicas != nil && *minReplicas <= 0 {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas, "minReplicas cannot be zero or negative")
allErrs = append(allErrs, e)
}

if minReplicas != nil && replicas != nil && *minReplicas > *replicas {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas,
"minReplicas must be less than or equal to replicas")
allErrs = append(allErrs, e)
}

if minReplicas != nil && maxReplicas != nil && *minReplicas > *maxReplicas {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *maxReplicas,
"minReplicas must be less than or equal to maxReplicas")
allErrs = append(allErrs, e)
}
return allErrs
}

Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -2783,8 +2787,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
type: array
sinks:
Expand Down Expand Up @@ -2952,6 +2954,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -5462,8 +5468,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
type: array
sources:
Expand Down Expand Up @@ -5552,6 +5556,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -8047,6 +8055,7 @@ spec:
type: object
replicas:
format: int32
minimum: 1
type: integer
resources:
properties:
Expand Down Expand Up @@ -8105,8 +8114,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
type: array
type: object
Expand All @@ -8123,6 +8130,9 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
sinkConditions:
additionalProperties:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -2802,8 +2806,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
status:
properties:
Expand All @@ -2818,6 +2820,9 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
replicas:
format: int32
type: integer
Expand Down
Loading