Skip to content

Commit

Permalink
chore: move ReplicaSet creation and Rollout validation earlier during…
Browse files Browse the repository at this point in the history
… the reconciliation process. (#3657)

refactor replicaset creation

Signed-off-by: Zach Aller <[email protected]>
  • Loading branch information
zachaller authored Aug 26, 2024
1 parent 80fff8e commit 7938e84
Show file tree
Hide file tree
Showing 17 changed files with 577 additions and 255 deletions.
8 changes: 4 additions & 4 deletions rollout/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func TestInvalidSpecMissingClusterTemplatesBackgroundAnalysis(t *testing.T) {
f.objects = append(f.objects, r)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestFailCreateStepAnalysisRunIfInvalidTemplateRef(t *testing.T) {
f.objects = append(f.objects, r, at)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func TestFailCreateBackgroundAnalysisRunIfInvalidTemplateRef(t *testing.T) {
f.objects = append(f.objects, r, at)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestFailCreateBackgroundAnalysisRunIfMetricRepeated(t *testing.T) {
f.objects = append(f.objects, r, at, at2)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

expectedPatchWithoutSub := `{
"status": {
Expand Down
2 changes: 1 addition & 1 deletion rollout/bluegreen.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *rolloutContext) rolloutBlueGreen() error {
if err != nil {
return err
}
c.newRS, err = c.getAllReplicaSetsAndSyncRevision(true)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutBlueGreen create true: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions rollout/bluegreen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,8 +950,10 @@ func TestBlueGreenRolloutStatusHPAStatusFieldsNoActiveSelector(t *testing.T) {
f := newFixture(t)
defer f.Close()
f.objects = append(f.objects, ro)
f.kubeobjects = append(f.kubeobjects, activeSvc)
f.rolloutLister = append(f.rolloutLister, ro)
f.replicaSetLister = append(f.replicaSetLister, rs)
f.serviceLister = append(f.serviceLister, activeSvc)

ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(ro)
Expand Down
11 changes: 2 additions & 9 deletions rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
func (c *rolloutContext) rolloutCanary() error {
var err error
if replicasetutil.PodTemplateOrStepsChanged(c.rollout, c.newRS) {
c.newRS, err = c.getAllReplicaSetsAndSyncRevision(false)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutCanary with PodTemplateOrStepsChanged: %w", err)
}
return c.syncRolloutStatusCanary()
}

c.newRS, err = c.getAllReplicaSetsAndSyncRevision(true)
c.newRS, err = c.getAllReplicaSetsAndSyncRevision()
if err != nil {
return fmt.Errorf("failed to getAllReplicaSetsAndSyncRevision in rolloutCanary create true: %w", err)
}
Expand Down Expand Up @@ -448,13 +448,6 @@ func (c *rolloutContext) reconcileCanaryReplicaSets() (bool, error) {
return true, nil
}

// If we have updated both the replica count and the pod template hash c.newRS will be nil we want to reconcile the newRS so we look at the
// rollout status to get the newRS to reconcile it.
if c.newRS == nil && c.rollout.Status.CurrentPodHash != c.rollout.Status.StableRS {
rs, _ := replicasetutil.GetReplicaSetByTemplateHash(c.allRSs, c.rollout.Status.CurrentPodHash)
c.newRS = rs
}

scaledNewRS, err := c.reconcileNewReplicaSet()
if err != nil {
return false, err
Expand Down
72 changes: 21 additions & 51 deletions rollout/canary_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rollout

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -421,8 +420,11 @@ func TestResetCurrentStepIndexOnStepChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateRolloutStatusAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)
createRSIndex := f.expectCreateReplicaSetAction(rs1)
f.run(getKey(r2, t))
createdRS := f.getCreatedReplicaSet(createRSIndex)

patch := f.getPatchedRollout(patchIndex)
expectedPatchWithoutPodHash := `{
Expand All @@ -433,7 +435,7 @@ func TestResetCurrentStepIndexOnStepChange(t *testing.T) {
"conditions": %s
}
}`
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, r2, false, "", false)
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, createdRS, false, "", false)
expectedPatch := fmt.Sprintf(expectedPatchWithoutPodHash, expectedCurrentPodHash, expectedCurrentStepHash, newConditions)
assert.JSONEq(t, calculatePatch(r2, expectedPatch), patch)
}
Expand Down Expand Up @@ -462,18 +464,23 @@ func TestResetCurrentStepIndexOnPodSpecChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateRolloutStatusAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)
createdRSIndex := f.expectCreateReplicaSetAction(rs1)

f.run(getKey(r2, t))

patch := f.getPatchedRollout(patchIndex)
updatedRS := f.getUpdatedReplicaSet(createdRSIndex)

expectedPatchWithoutPodHash := `{
"status": {
"currentStepIndex":0,
"currentPodHash": "%s",
"conditions": %s
}
}`
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, r2, false, "", false)
newConditions := generateConditionsPatch(true, conditions.ReplicaSetUpdatedReason, updatedRS, false, "", false)

expectedPatch := fmt.Sprintf(expectedPatchWithoutPodHash, expectedCurrentPodHash, newConditions)
assert.JSONEq(t, calculatePatch(r2, expectedPatch), patch)
Expand Down Expand Up @@ -1564,7 +1571,7 @@ func TestCanaryRolloutWithInvalidCanaryServiceName(t *testing.T) {
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchRolloutAction(rollout)
f.run(getKey(rollout, t))
f.runExpectError(getKey(rollout, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1616,7 +1623,7 @@ func TestCanaryRolloutWithInvalidStableServiceName(t *testing.T) {
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchRolloutAction(rollout)
f.run(getKey(rollout, t))
f.runExpectError(getKey(rollout, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1665,7 +1672,7 @@ func TestCanaryRolloutWithInvalidPingServiceName(t *testing.T) {
f.objects = append(f.objects, r)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1697,7 +1704,7 @@ func TestCanaryRolloutWithInvalidPongServiceName(t *testing.T) {
f.serviceLister = append(f.serviceLister, pingSvc)

patchIndex := f.expectPatchRolloutAction(r)
f.run(getKey(r, t))
f.runExpectError(getKey(r, t), true)

patch := make(map[string]any)
patchData := f.getPatchedRollout(patchIndex)
Expand Down Expand Up @@ -1896,8 +1903,14 @@ func TestHandleNilNewRSOnScaleAndImageChange(t *testing.T) {
f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

f.expectUpdateReplicaSetAction(rs1)
f.expectUpdateRolloutStatusAction(r2)
f.expectPatchRolloutAction(r2)
patchIndex := f.expectPatchRolloutAction(r2)

f.expectCreateReplicaSetAction(rs1)
f.expectUpdateReplicaSetAction(rs1)
f.expectUpdateReplicaSetAction(rs1)

f.run(getKey(r2, t))
patch := f.getPatchedRollout(patchIndex)
assert.JSONEq(t, calculatePatch(r2, OnlyObservedGenerationPatch), patch)
Expand Down Expand Up @@ -2105,49 +2118,6 @@ func TestIsDynamicallyRollingBackToStable(t *testing.T) {
}
}

func TestCanaryReplicaAndSpecChangedTogether(t *testing.T) {
f := newFixture(t)
defer f.Close()

originReplicas := 3
r1 := newCanaryRollout("foo", originReplicas, nil, nil, nil, intstr.FromInt(1), intstr.FromInt(0))
canarySVCName := "canary"
stableSVCName := "stable"
r1.Spec.Strategy.Canary.CanaryService = canarySVCName
r1.Spec.Strategy.Canary.StableService = stableSVCName

stableRS := newReplicaSetWithStatus(r1, originReplicas, originReplicas)
stableSVC := newService(stableSVCName, 80,
map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: stableRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]}, r1)

r2 := bumpVersion(r1)
canaryRS := newReplicaSetWithStatus(r2, originReplicas, originReplicas)
canarySVC := newService(canarySVCName, 80,
map[string]string{v1alpha1.DefaultRolloutUniqueLabelKey: canaryRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]}, r2)

f.replicaSetLister = append(f.replicaSetLister, canaryRS, stableRS)
f.serviceLister = append(f.serviceLister, canarySVC, stableSVC)

r3 := bumpVersion(r2)
r3.Spec.Replicas = pointer.Int32(int32(originReplicas) + 5)
r3.Status.StableRS = stableRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
r3.Status.CurrentPodHash = canaryRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]

f.rolloutLister = append(f.rolloutLister, r3)
f.kubeobjects = append(f.kubeobjects, canaryRS, stableRS, canarySVC, stableSVC)
f.objects = append(f.objects, r3)

ctrl, _, _ := f.newController(noResyncPeriodFunc)
roCtx, err := ctrl.newRolloutContext(r3)
assert.NoError(t, err)
err = roCtx.reconcile()
assert.NoError(t, err)
updated, err := f.kubeclient.AppsV1().ReplicaSets(r3.Namespace).Get(context.Background(), canaryRS.Name, metav1.GetOptions{})
assert.NoError(t, err)
// check the canary one is updated
assert.NotEqual(t, originReplicas, int(*updated.Spec.Replicas))
}

func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")
Expand Down
22 changes: 3 additions & 19 deletions rollout/context.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package rollout

import (
"time"

log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/validation/field"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
analysisutil "github.com/argoproj/argo-rollouts/utils/analysis"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
)

type rolloutContext struct {
Expand Down Expand Up @@ -53,19 +49,7 @@ type rolloutContext struct {
}

func (c *rolloutContext) reconcile() error {
// Get Rollout Validation errors
err := c.getRolloutValidationErrors()
if err != nil {
if vErr, ok := err.(*field.Error); ok {
// We want to frequently requeue rollouts with InvalidSpec errors, because the error
// condition might be timing related (e.g. the Rollout was applied before the Service).
c.enqueueRolloutAfter(c.rollout, 20*time.Second)
return c.createInvalidRolloutCondition(vErr, c.rollout)
}
return err
}

err = c.checkPausedConditions()
err := c.checkPausedConditions()
if err != nil {
return err
}
Expand Down
32 changes: 32 additions & 0 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,38 @@ func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutConte
},
reconcilerBase: c.reconcilerBase,
}

// Get Rollout Validation errors
err = roCtx.getRolloutValidationErrors()
if err != nil {
if vErr, ok := err.(*field.Error); ok {
// We want to frequently requeue rollouts with InvalidSpec errors, because the error
// condition might be timing related (e.g. the Rollout was applied before the Service).
c.enqueueRolloutAfter(roCtx.rollout, 20*time.Second)
err := roCtx.createInvalidRolloutCondition(vErr, roCtx.rollout)
if err != nil {
return nil, err
}
return nil, vErr
}
return nil, err
}

if roCtx.newRS == nil {
roCtx.newRS, err = roCtx.createDesiredReplicaSet()
if err != nil {
return nil, err
}
roCtx.olderRSs = replicasetutil.FindOldReplicaSets(roCtx.rollout, rsList, roCtx.newRS)
roCtx.stableRS = replicasetutil.GetStableRS(roCtx.rollout, roCtx.newRS, roCtx.olderRSs)
roCtx.otherRSs = replicasetutil.GetOtherRSs(roCtx.rollout, roCtx.newRS, roCtx.stableRS, rsList)
roCtx.allRSs = append(rsList, roCtx.newRS)
err := roCtx.replicaSetInformer.GetIndexer().Add(roCtx.newRS)
if err != nil {
return nil, err
}
}

if rolloututil.IsFullyPromoted(rollout) && roCtx.pauseContext.IsAborted() {
logCtx.Warnf("Removing abort condition from fully promoted rollout")
roCtx.pauseContext.RemoveAbort()
Expand Down
Loading

0 comments on commit 7938e84

Please sign in to comment.