diff --git a/rollout/canary_test.go b/rollout/canary_test.go index ad3d9b4267..ddcfe1ec1e 100644 --- a/rollout/canary_test.go +++ b/rollout/canary_test.go @@ -3,16 +3,10 @@ package rollout import ( "encoding/json" "fmt" - "os" "strconv" "testing" "time" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - k8stesting "k8s.io/client-go/testing" - "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/apps/v1" @@ -2117,106 +2111,3 @@ func TestIsDynamicallyRollingBackToStable(t *testing.T) { }) } } - -func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) { - os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true") - defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") - - f := newFixture(t) - defer f.Close() - - steps := []v1alpha1.CanaryStep{ - { - SetWeight: int32Ptr(10), - }, { - Pause: &v1alpha1.RolloutPause{ - Duration: v1alpha1.DurationFromInt(10), - }, - }, - } - r1 := newCanaryRollout("foo", 10, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0)) - r1.Spec.Template.Labels["rollout.argoproj.io/foo"] = "bar" - - rs1 := newReplicaSetWithStatus(r1, 10, 10) - r1.Spec.Replicas = pointer.Int32(2) - f.kubeobjects = append(f.kubeobjects, rs1) - f.replicaSetLister = append(f.replicaSetLister, rs1) - - f.rolloutLister = append(f.rolloutLister, r1) - f.objects = append(f.objects, r1) - - f.expectPatchRolloutAction(r1) - f.expectUpdateReplicaSetAction(rs1) // attempt to scale replicaset but conflict - patchIndex := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset - - key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name) - c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute }) - - f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - return true, action.(k8stesting.UpdateAction).GetObject(), errors.NewConflict(schema.GroupResource{ - Group: "Apps", - Resource: "ReplicaSet", - }, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error")) - }) - - f.runController(key, true, false, c, i, k8sI) - - updatedRs := f.getPatchedReplicaSet(patchIndex) // minus one because update did not happen because conflict - assert.Equal(t, int32(2), *updatedRs.Spec.Replicas) -} - -func TestSyncRolloutWithConflictInSyncReplicaSetRevision(t *testing.T) { - os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true") - defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") - - f := newFixture(t) - defer f.Close() - - steps := []v1alpha1.CanaryStep{ - { - SetWeight: int32Ptr(10), - }, { - Pause: &v1alpha1.RolloutPause{ - Duration: v1alpha1.DurationFromInt(10), - }, - }, - } - r1 := newCanaryRollout("foo", 3, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0)) - r2 := bumpVersion(r1) - - rs1 := newReplicaSetWithStatus(r1, 3, 3) - rs2 := newReplicaSetWithStatus(r2, 3, 3) - rs2.Annotations["rollout.argoproj.io/revision"] = "1" - - f.kubeobjects = append(f.kubeobjects, rs1, rs2) - f.replicaSetLister = append(f.replicaSetLister, rs1, rs2) - - f.rolloutLister = append(f.rolloutLister, r2) - f.objects = append(f.objects, r2) - - key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name) - c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute }) - - f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { - return true, &appsv1.ReplicaSet{}, errors.NewConflict(schema.GroupResource{ - Group: "Apps", - Resource: "ReplicaSet", - }, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error")) - }) - - f.expectPatchRolloutAction(r2) - f.expectUpdateReplicaSetAction(rs1) // attempt to update replicaset revision but conflict - patchIndex1 := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset - - f.expectUpdateReplicaSetAction(rs2) // attempt to scale replicaset but conflict - patchIndex2 := f.expectPatchReplicaSetAction(rs2) // instead of update patch replicaset - - f.runController(key, true, false, c, i, k8sI) - - updatedRs1 := f.getPatchedReplicaSet(patchIndex1) - assert.Equal(t, "2", updatedRs1.Annotations["rollout.argoproj.io/revision"]) - assert.Equal(t, int32(3), *updatedRs1.Spec.Replicas) - - updatedRs2 := f.getPatchedReplicaSet(patchIndex2) - assert.Equal(t, int32(0), *updatedRs2.Spec.Replicas) -} diff --git a/rollout/controller.go b/rollout/controller.go index 7131ee8546..7e94d7f789 100644 --- a/rollout/controller.go +++ b/rollout/controller.go @@ -4,16 +4,11 @@ import ( "context" "encoding/json" "fmt" - "os" "reflect" "strconv" - "strings" "sync" "time" - "github.com/argoproj/argo-rollouts/utils/annotations" - - "github.com/argoproj/argo-rollouts/utils/diff" "k8s.io/apimachinery/pkg/runtime/schema" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts" @@ -21,13 +16,11 @@ import ( log "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - patchtypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -573,10 +566,6 @@ func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutConte roCtx.stableRS = replicasetutil.GetStableRS(roCtx.rollout, roCtx.newRS, roCtx.olderRSs) roCtx.otherRSs = replicasetutil.GetOtherRSs(roCtx.rollout, roCtx.newRS, roCtx.stableRS, rsList) roCtx.allRSs = append(rsList, roCtx.newRS) - err := roCtx.replicaSetInformer.GetIndexer().Add(roCtx.newRS) - if err != nil { - return nil, err - } } if rolloututil.IsFullyPromoted(rollout) && roCtx.pauseContext.IsAborted() { @@ -995,91 +984,14 @@ func remarshalRollout(r *v1alpha1.Rollout) *v1alpha1.Rollout { return &remarshalled } -// updateReplicaSetWithPatch updates the replicaset using Update and on failure falls back to a patch this function only exists to make sure we always can update -// replicasets and to not get into an conflict loop updating replicasets. We should really look into a complete refactor of how rollouts handles replicasets such -// that we do not keep a fully replicaset on the rollout context under newRS and instead switch to a patch only based approach. -func (c *rolloutContext) updateReplicaSetFallbackToPatch(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) { +// updateReplicaSet updates the replicaset using kubeclient update. It returns the updated replicaset and copies the updated replicaset +// into the passed in pointer as well. +func (c *rolloutContext) updateReplicaSet(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) { updatedRS, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Update(ctx, rs, metav1.UpdateOptions{}) if err != nil { - if errors.IsConflict(err) { - if os.Getenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") == "true" { - rsGet, err := c.replicaSetLister.ReplicaSets(rs.Namespace).Get(rs.Name) - if err != nil { - return nil, fmt.Errorf("error getting replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - rsGetJson, err := json.Marshal(rsGet) - if err != nil { - return nil, fmt.Errorf("error marshalling informer replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - rsCopyJson, err := json.Marshal(rs) - if err != nil { - return nil, fmt.Errorf("error marshalling memory replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - c.log.Infof("Informer RS: %s", rsGetJson) - c.log.Infof("Memory RS: %s", rsCopyJson) - } - - c.log.Infof("Conflict when updating replicaset %s, falling back to patch", rs.Name) - - patchRS := appsv1.ReplicaSet{} - patchRS.Spec.Replicas = rs.Spec.Replicas - patchRS.Spec.Template.Labels = rs.Spec.Template.Labels - patchRS.Spec.Template.Annotations = rs.Spec.Template.Annotations - - patchRS.Annotations = make(map[string]string) - patchRS.Labels = make(map[string]string) - patchRS.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: make(map[string]string), - } - - if _, found := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; found { - patchRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] - } - - if _, found := rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; found { - patchRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] - } - - if _, found := rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]; found { - patchRS.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] - } - - for key, value := range rs.Annotations { - if strings.HasPrefix(key, annotations.RolloutLabel) || - strings.HasPrefix(key, "argo-rollouts.argoproj.io") || - strings.HasPrefix(key, "experiment.argoproj.io") { - patchRS.Annotations[key] = value - } - } - for key, value := range rs.Labels { - if strings.HasPrefix(key, annotations.RolloutLabel) || - strings.HasPrefix(key, "argo-rollouts.argoproj.io") || - strings.HasPrefix(key, "experiment.argoproj.io") { - patchRS.Labels[key] = value - } - } - - patch, _, err := diff.CreateTwoWayMergePatch(appsv1.ReplicaSet{}, patchRS, appsv1.ReplicaSet{}) - if err != nil { - return nil, fmt.Errorf("error creating patch for conflict log in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - - c.log.Infof("Patching replicaset with patch: %s", string(patch)) - updatedRS, err = c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.StrategicMergePatchType, patch, metav1.PatchOptions{}) - if err != nil { - return nil, fmt.Errorf("error patching replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - - err = c.replicaSetInformer.GetIndexer().Update(updatedRS) - if err != nil { - return nil, fmt.Errorf("error updating replicaset informer in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err) - } - - return updatedRS, err - } - } - if updatedRS != nil { - updatedRS.DeepCopyInto(rs) + return nil, fmt.Errorf("error updating replicaset in updateReplicaSet %s: %w", rs.Name, err) } + updatedRS.DeepCopyInto(rs) + return rs, err } diff --git a/rollout/controller_test.go b/rollout/controller_test.go index 2e848212fa..383a539bd0 100644 --- a/rollout/controller_test.go +++ b/rollout/controller_test.go @@ -960,7 +960,7 @@ func (f *fixture) getUpdatedReplicaSet(index int) *appsv1.ReplicaSet { return rs } -func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet { +func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet { //nolint action := filterInformerActions(f.kubeclient.Actions())[index] patchAction, ok := action.(core.PatchAction) if !ok { diff --git a/rollout/ephemeralmetadata.go b/rollout/ephemeralmetadata.go index 29328e1c5f..c3b9d27a6b 100644 --- a/rollout/ephemeralmetadata.go +++ b/rollout/ephemeralmetadata.go @@ -71,7 +71,7 @@ func (c *rolloutContext) syncEphemeralMetadata(ctx context.Context, rs *appsv1.R // First update replicasets, then pods owned by it. // So that any replicas created in the interim between the two steps are using the new updated version. // 1. Update ReplicaSet so that any new pods it creates will have the metadata - rs, err := c.updateReplicaSetFallbackToPatch(ctx, modifiedRS) + rs, err := c.updateReplicaSet(ctx, modifiedRS) if err != nil { c.log.Infof("failed to sync ephemeral metadata %v to ReplicaSet %s: %v", podMetadata, rs.Name, err) return fmt.Errorf("failed to sync ephemeral metadata: %w", err) diff --git a/rollout/replicaset.go b/rollout/replicaset.go index c16ce6f037..e374b98286 100644 --- a/rollout/replicaset.go +++ b/rollout/replicaset.go @@ -39,10 +39,6 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error { return fmt.Errorf("error removing scale-down-deadline annotation from RS '%s': %w", rs.Name, err) } c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name) - err = c.replicaSetInformer.GetIndexer().Update(rs) - if err != nil { - return fmt.Errorf("error updating replicaset informer in removeScaleDownDelay: %w", err) - } return err } @@ -68,10 +64,6 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay return fmt.Errorf("error adding scale-down-deadline annotation to RS '%s': %w", rs.Name, err) } c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds) - err = c.replicaSetInformer.GetIndexer().Update(rs) - if err != nil { - return fmt.Errorf("error updating replicaset informer in addScaleDownDelay: %w", err) - } return err } diff --git a/rollout/service_test.go b/rollout/service_test.go index 21e7401653..f31cd1bf3a 100644 --- a/rollout/service_test.go +++ b/rollout/service_test.go @@ -100,10 +100,8 @@ func TestGetPreviewAndActiveServices(t *testing.T) { noActiveSvcRollout := rollout.DeepCopy() noActiveSvcRollout.Spec.Strategy.BlueGreen.ActiveService = "" roCtx, err := c.newRolloutContext(noActiveSvcRollout) - assert.NoError(t, err) - _, _, err = roCtx.getPreviewAndActiveServices() - assert.NotNil(t, err) - assert.EqualError(t, err, "service \"\" not found") + assert.Error(t, err) + assert.Nil(t, roCtx) }) } diff --git a/rollout/sync.go b/rollout/sync.go index c0d394a74b..3a70878d8f 100644 --- a/rollout/sync.go +++ b/rollout/sync.go @@ -82,7 +82,7 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) { rsCopy.Spec.MinReadySeconds = c.rollout.Spec.MinReadySeconds rsCopy.Spec.Template.Spec.Affinity = replicasetutil.GenerateReplicaSetAffinity(*c.rollout) - rs, err := c.updateReplicaSetFallbackToPatch(ctx, rsCopy) + rs, err := c.updateReplicaSet(ctx, rsCopy) if err != nil { return nil, fmt.Errorf("failed to update replicaset revision on %s: %w", rsCopy.Name, err) } @@ -372,13 +372,13 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, *(rsCopy.Spec.Replicas) = newScale annotations.SetReplicasAnnotations(rsCopy, rolloutReplicas) if fullScaleDown && !c.shouldDelayScaleDownOnAbort() { - // This bypasses the normal call to removeScaleDownDelay and then depends on the removal via an update in updateReplicaSetFallbackToPatch + // This bypasses the normal call to removeScaleDownDelay and then depends on the removal via an update in updateReplicaSet delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey) } - rs, err = c.updateReplicaSetFallbackToPatch(ctx, rsCopy) + rs, err = c.updateReplicaSet(ctx, rsCopy) if err != nil { - return scaled, rs, fmt.Errorf("failed to updateReplicaSetFallbackToPatch in scaleReplicaSet: %w", err) + return scaled, rs, fmt.Errorf("failed to updateReplicaSet in scaleReplicaSet: %w", err) } if sizeNeedsUpdate {