Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions api/operator/v1beta1/vmextra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
SkipValidationValue = "true"
AdditionalServiceLabel = "operator.victoriametrics.com/additional-service"
// PVCExpandableLabel controls checks for storageClass
PVCExpandableLabel = "operator.victoriametrics.com/pvc-allow-volume-expansion"
lastAppliedSpecAnnotationName = "operator.victoriametrics/last-applied-spec"
PVCExpandableLabel = "operator.victoriametrics.com/pvc-allow-volume-expansion"
LastAppliedSpecAnnotation = "operator.victoriametrics/last-applied-spec"
)

const (
Expand Down Expand Up @@ -1034,21 +1034,21 @@ type objectWithLastAppliedState[T, ST any] interface {

// ParseLastAppliedStateTo parses spec from provided CR annotations and sets it to the given CR
func ParseLastAppliedStateTo[T objectWithLastAppliedState[T, ST], ST any](cr T) error {
lastAppliedSpecJSON := cr.GetAnnotations()[lastAppliedSpecAnnotationName]
lastAppliedSpecJSON := cr.GetAnnotations()[LastAppliedSpecAnnotation]
if len(lastAppliedSpecJSON) == 0 {
return nil
}
var dst ST
if err := json.Unmarshal([]byte(lastAppliedSpecJSON), &dst); err != nil {
return fmt.Errorf("cannot parse last applied spec annotation=%q, remove this annotation manually from object : %w", lastAppliedSpecAnnotationName, err)
return fmt.Errorf("cannot parse last applied spec annotation=%q, remove this annotation manually from object : %w", LastAppliedSpecAnnotation, err)
}
cr.SetLastSpec(dst)
return nil
}

// HasSpecChanges compares single spec with last applied single spec stored in annotation
func HasStateChanges(crMeta metav1.ObjectMeta, spec any) (bool, error) {
lastAppliedSpecJSON := crMeta.GetAnnotations()[lastAppliedSpecAnnotationName]
lastAppliedSpecJSON := crMeta.GetAnnotations()[LastAppliedSpecAnnotation]
if len(lastAppliedSpecJSON) == 0 {
return true, nil
}
Expand All @@ -1070,7 +1070,7 @@ func LastAppliedChangesAsPatch(crMeta metav1.ObjectMeta, spec any) (client.Patch
if err != nil {
return nil, fmt.Errorf("possible bug, cannot serialize single specification as json :%w", err)
}
patch := fmt.Sprintf(`{"metadata":{"annotations":{%q: %q }}}`, lastAppliedSpecAnnotationName, data)
patch := fmt.Sprintf(`{"metadata":{"annotations":{%q: %q }}}`, LastAppliedSpecAnnotation, data)
return client.RawPatch(types.MergePatchType, []byte(patch)), nil

}
Expand Down
75 changes: 42 additions & 33 deletions internal/controller/operator/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/predicate"
k8sreconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"

vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1"
Expand Down Expand Up @@ -363,60 +365,67 @@ func reconcileAndTrackStatus[T client.Object, ST reconcile.StatusWithMetadata[ST
resultErr = fmt.Errorf("cannot parse exist spec changes")
return
}
var diffPatch client.Patch
resultStatus := vmv1beta1.UpdateStatusOperational
if specChanged {
diffPatch, err = object.LastAppliedSpecAsPatch()
if err != nil {
resultErr = fmt.Errorf("cannot parse last applied spec for cluster: %w", err)
return
}

if err := reconcile.UpdateObjectStatus(ctx, c, object, vmv1beta1.UpdateStatusExpanding, nil); err != nil {
resultErr = fmt.Errorf("failed to update object status: %w", err)
return
}
// update lastAppliedSpec as soon as operator receives it
// it allows to properly build diff with previous object state
// and rollback bad configurations
if err := c.Patch(ctx, object, diffPatch); err != nil {
resultErr = fmt.Errorf("cannot update cluster with last applied spec: %w", err)
return
}
if err := createGenericEventForObject(ctx, c, object, "starting object update"); err != nil {
logger.WithContext(ctx).Error(err, " cannot create k8s api event")
}
logger.WithContext(ctx).Info("object has changes with previous state, applying changes")
}
defer func() {
prevStatus := object.GetStatus().GetStatusMetadata().UpdateStatus
if specChanged && prevStatus != resultStatus {
newPatch, err := object.LastAppliedSpecAsPatch()
if err != nil {
resultErr = fmt.Errorf("cannot parse last applied spec: %w", err)
return
}
if err := c.Patch(ctx, object, newPatch); err != nil {
resultErr = fmt.Errorf("cannot update resource with last applied spec: %w", err)
return
}
}
if err := reconcile.UpdateObjectStatus(ctx, c, object, resultStatus, resultErr); err != nil {
resultErr = fmt.Errorf("failed to update object status: %w", err)
return
}
}()

result, err = cb()
if err != nil {
// do not change status on conflict to failed
// do not change status on conflict or timeout to failed
// it should be retried on the next loop
if k8serrors.IsConflict(err) {
return
if k8serrors.IsConflict(err) || reconcile.IsErrorWaitTimeout(err) {
resultStatus = vmv1beta1.UpdateStatusExpanding
} else {
resultStatus = vmv1beta1.UpdateStatusFailed
resultErr = err
}
desiredStatus := vmv1beta1.UpdateStatusFailed
if reconcile.IsErrorWaitTimeout(err) {
desiredStatus = vmv1beta1.UpdateStatusExpanding
err = nil
}
if updateErr := reconcile.UpdateObjectStatus(ctx, c, object, desiredStatus, err); updateErr != nil {
resultErr = fmt.Errorf("failed to update object status: %q, origin err: %w", updateErr, err)
return
}

return result, err
return
}
if specChanged {
if err := createGenericEventForObject(ctx, c, object, "reconcile of object finished successfully"); err != nil {
logger.WithContext(ctx).Error(err, " cannot create k8s api event")
}
logger.WithContext(ctx).Info("object was successfully reconciled")
}
if err := reconcile.UpdateObjectStatus(ctx, c, object, vmv1beta1.UpdateStatusOperational, nil); err != nil {
resultErr = fmt.Errorf("failed to update object status: %w", err)
return
}

return result, nil
}

var patchAnnotationPredicate = predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return true
}
oldAnnotations := e.ObjectOld.GetAnnotations()
newAnnotations := e.ObjectNew.GetAnnotations()
if oldAnnotations[vmv1beta1.LastAppliedSpecAnnotation] != newAnnotations[vmv1beta1.LastAppliedSpecAnnotation] {
return false
}
return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
},
}
67 changes: 17 additions & 50 deletions internal/controller/operator/factory/reconcile/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package reconcile

import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"reflect"
Expand Down Expand Up @@ -192,72 +191,40 @@ type StatusWithMetadata[T any] interface {
GetStatusMetadata() *vmv1beta1.StatusMetadata
}

// UpdateStatus reconcile provided object status with given actualStatus status
func UpdateObjectStatus[T client.Object, ST StatusWithMetadata[STC], STC any](ctx context.Context, rclient client.Client, object ObjectWithDeepCopyAndStatus[T, ST, STC], actualStatus vmv1beta1.UpdateStatus, maybeErr error) error {
currentStatus := object.GetStatus()
prevStatus := currentStatus.DeepCopy()
currMeta := currentStatus.GetStatusMetadata()
newUpdateStatus := actualStatus
// UpdateStatus reconcile provided object status with given newUpdateStatus status
func UpdateObjectStatus[T client.Object, ST StatusWithMetadata[STC], STC any](ctx context.Context, rclient client.Client, object ObjectWithDeepCopyAndStatus[T, ST, STC], newUpdateStatus vmv1beta1.UpdateStatus, maybeErr error) error {
patch := client.MergeFrom(object.DeepCopy())
newObjectStatus := object.GetStatus()
prevObjectStatus := newObjectStatus.DeepCopy()
newObjectMeta := newObjectStatus.GetStatusMetadata()

switch actualStatus {
switch newUpdateStatus {
case vmv1beta1.UpdateStatusExpanding, vmv1beta1.UpdateStatusOperational:
currMeta.Reason = ""
newObjectMeta.Reason = ""
case vmv1beta1.UpdateStatusPaused:
case vmv1beta1.UpdateStatusFailed:
if maybeErr != nil {
currMeta.Reason = maybeErr.Error()
newObjectMeta.Reason = maybeErr.Error()
}
default:
panic(fmt.Sprintf("BUG: not expected status=%q", actualStatus))
panic(fmt.Sprintf("BUG: not expected status=%q", newUpdateStatus))
}

currMeta.ObservedGeneration = object.GetGeneration()
object.DefaultStatusFields(currentStatus)
// if opts.mutateCurrentBeforeCompare != nil {
// opts.mutateCurrentBeforeCompare(opts.crStatus.(ST))
// }
object.DefaultStatusFields(newObjectStatus)
newObjectMeta.ObservedGeneration = object.GetGeneration()
newObjectMeta.UpdateStatus = newUpdateStatus
// compare before send update request
// it reduces load at kubernetes api-server
if equality.Semantic.DeepEqual(currentStatus, prevStatus) && currMeta.UpdateStatus == actualStatus {
if equality.Semantic.DeepEqual(newObjectStatus, prevObjectStatus) {
return nil
}
currMeta.UpdateStatus = newUpdateStatus

// make a deep copy before passing object to Patch function
// it reload state of the object from API server
// which is not desired behaviour
objecToUpdate := object.DeepCopy()
pr, err := buildStatusPatch(currentStatus)
if err != nil {
return err
}
if err := rclient.Status().Patch(ctx, objecToUpdate, pr); err != nil {
objectToUpdate := object.DeepCopy()
if err := rclient.Status().Patch(ctx, objectToUpdate, patch); err != nil {
return fmt.Errorf("cannot update resource status with patch: %w", err)
}
// Update ResourceVersion in order to resolve future conflicts
object.SetResourceVersion(objecToUpdate.GetResourceVersion())
object.SetResourceVersion(objectToUpdate.GetResourceVersion())

return nil
}

func buildStatusPatch(currentStatus any) (client.Patch, error) {
type patch struct {
OP string `json:"op"`
Path string `json:"path"`
Value any `json:"value"`
}
ops := []patch{
{
OP: "replace",
Path: "/status",
Value: currentStatus,
},
}
data, err := json.Marshal(ops)
if err != nil {
return nil, fmt.Errorf("possible bug, cannot serialize patch specification as json :%w", err)
}

return client.RawPatch(types.JSONPatchType, data), nil

}
1 change: 1 addition & 0 deletions internal/controller/operator/vlagent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,6 @@ func (r *VLAgentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func (r *VLClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vlogs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,6 @@ func (r *VLogsReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vlsingle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func (r *VLSingleReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmagent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,6 @@ func (r *VMAgentReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmalert_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,6 @@ func (r *VMAlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,6 @@ func (r *VMAlertmanagerReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmanomaly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ func (r *VMAnomalyReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmauth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,6 @@ func (r *VMAuthReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@ func (r *VMClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&appsv1.StatefulSet{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vmsingle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,6 @@ func (r *VMSingleReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.ServiceAccount{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vtcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func (r *VTClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
1 change: 1 addition & 0 deletions internal/controller/operator/vtsingle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@ func (r *VTSingleReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
WithOptions(getDefaultOptions()).
WithEventFilter(patchAnnotationPredicate).
Complete(r)
}
7 changes: 0 additions & 7 deletions test/e2e/childobjects/vmalertmanagerconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var _ = Describe("test vmalertmanagerconfig Controller", Label("vm", "child", "a
amCfgs []*vmv1beta1.VMAlertmanagerConfig
}
type step struct {
setup func()
modify func()
verify func()
}
Expand All @@ -46,9 +45,6 @@ var _ = Describe("test vmalertmanagerconfig Controller", Label("vm", "child", "a
}
})
step := steps[0]
if step.setup != nil {
step.setup()
}
for _, am := range args.ams {
Expect(k8sClient.Create(ctx, am)).To(Succeed())
}
Expand All @@ -70,9 +66,6 @@ var _ = Describe("test vmalertmanagerconfig Controller", Label("vm", "child", "a
}
step.verify()
for _, step := range steps[1:] {
if step.setup != nil {
step.setup()
}
if step.modify != nil {
step.modify()
}
Expand Down
Loading
Loading