Skip to content

Commit

Permalink
Ensure deplyoment await logic uses the latest deployment object (#2943)
Browse files Browse the repository at this point in the history
### Proposed changes

This PR ensures that we compare deployment revisions based on the old
live object, and the most current deployment object.

This PR does the following:

- Add a repro test that fails before the change is applied
- Only watch for deployment events until the deployment object has been
reconciled by the deployment controller
- Update existing unit tests to account for blocking unbuffered
channels, and faked objects that did not have
`.status.observedGeneration`

### Related issues (optional)

Fixes: #2941
  • Loading branch information
rquitales authored Apr 11, 2024
1 parent 78ab387 commit c8150eb
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- ConfigFile V2 (https://github.com/pulumi/pulumi-kubernetes/pull/2862)
- Bugfix for ambiguous kinds (https://github.com/pulumi/pulumi-kubernetes/pull/2889)
- [yaml/v2] Support for resource ordering (https://github.com/pulumi/pulumi-kubernetes/pull/2894)
- Bugfix for deployment await logic not referencing the correct deployment status (https://github.com/pulumi/pulumi-kubernetes/pull/2943)

### New Features

Expand Down
46 changes: 46 additions & 0 deletions provider/pkg/await/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func (dia *deploymentInitAwaiter) await(
timeout,
aggregateErrorTicker <-chan time.Time,
) error {

// Before we start processing any ReplicaSet, PVC or Pod events, we need to wait until the Deployment controller
// has seen and updated the status of the Deployment object.
if err := dia.waitUntilDeploymentControllerReconciles(deploymentEvents, timeout); err != nil {
return err
}

for {
if dia.checkAndLogStatus() {
return nil
Expand Down Expand Up @@ -369,6 +376,45 @@ func (dia *deploymentInitAwaiter) await(
}
}

func (dia *deploymentInitAwaiter) waitUntilDeploymentControllerReconciles(deploymentEvents <-chan watch.Event, timeout <-chan time.Time) error {
for {
select {
case <-dia.config.ctx.Done():
return &cancellationError{
object: dia.deployment,
subErrors: dia.errorMessages(),
}
case <-timeout:
return &timeoutError{
object: dia.deployment,
subErrors: dia.errorMessages(),
}
case event := <-deploymentEvents:
deployment, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("Deployment watch received unknown object type %T",
event.Object)
continue
}

// Do nothing if this is not the Deployment we're waiting for.
if deployment.GetName() != dia.deployment.GetName() {
continue
}

observedGeneration, found, err := unstructured.NestedInt64(deployment.Object, "status", "observedGeneration")
if err != nil || !found {
continue
}

if deployment.GetGeneration() == observedGeneration {
dia.processDeploymentEvent(event)
return nil
}
}
}
}

// Check whether we've succeeded, log the result as a status message to the provider. There are two
// cases:
//
Expand Down
23 changes: 15 additions & 8 deletions provider/pkg/await/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,10 @@ func Test_Apps_Deployment_With_PersistentVolumeClaims(t *testing.T) {
// Controller creates ReplicaSet, and it tries to progress, but
// it fails when there are no PersistentVolumes available to fulfill the
// PersistentVolumeClaim.
pvcs <- watchAddedEvent(
persistentVolumeClaimInput(inputNamespace, pvcInputName))
deployments <- watchAddedEvent(
deploymentWithPVCAdded(inputNamespace, deploymentInputName, revision1, pvcInputName))
pvcs <- watchAddedEvent(
persistentVolumeClaimInput(inputNamespace, pvcInputName))
deployments <- watchAddedEvent(
deploymentWithPVCProgressing(inputNamespace, deploymentInputName, revision1, pvcInputName))
deployments <- watchAddedEvent(
Expand All @@ -635,6 +635,7 @@ func Test_Apps_Deployment_With_PersistentVolumeClaims(t *testing.T) {
updateAwaitConfig{
createAwaitConfig: mockAwaitConfig(deploymentWithPVCInput(inputNamespace, deploymentInputName, pvcInputName)),
})
// Channels should be buffered to avoid blocking.
deployments := make(chan watch.Event)
replicaSets := make(chan watch.Event)
pods := make(chan watch.Event)
Expand Down Expand Up @@ -664,12 +665,12 @@ func Test_Apps_Deployment_Without_PersistentVolumeClaims(t *testing.T) {
// The Deployment specifically does not reference the PVC in
// its spec. Therefore, the Deployment should succeed no
// matter what phase the PVC is in as it does not have to wait on it.
deployments <- watchAddedEvent(
deploymentAdded(inputNamespace, deploymentInputName, revision1))
pvcs <- watchAddedEvent(
persistentVolumeClaimInput(inputNamespace, pvcInputName))
pvcs <- watchAddedEvent(
persistentVolumeClaimPending(inputNamespace, pvcInputName))
deployments <- watchAddedEvent(
deploymentAdded(inputNamespace, deploymentInputName, revision1))
deployments <- watchAddedEvent(
deploymentProgressing(inputNamespace, deploymentInputName, revision1))
deployments <- watchAddedEvent(
Expand Down Expand Up @@ -1005,7 +1006,9 @@ func deploymentAdded(namespace, name, revision string) *unstructured.Unstructure
}
}
},
"status": {}
"status": {
"observedGeneration": 1
}
}`, namespace, name, revision))
if err != nil {
panic(err)
Expand Down Expand Up @@ -1054,6 +1057,7 @@ func deploymentProgressing(namespace, name, revision string) *unstructured.Unstr
}
},
"status": {
"observedGeneration": 1,
"conditions": [
{
"type": "Progressing",
Expand Down Expand Up @@ -1537,7 +1541,7 @@ func deploymentUpdated(namespace, name, revision string) *unstructured.Unstructu
}
},
"status": {
"observedGeneration": 1,
"observedGeneration": 2,
"replicas": 1,
"updatedReplicas": 1,
"readyReplicas": 1,
Expand Down Expand Up @@ -1609,7 +1613,7 @@ func deploymentScaledToZero(namespace, name, revision string) *unstructured.Unst
}
},
"status": {
"observedGeneration": 1,
"observedGeneration": 2,
"replicas": 0,
"conditions": [
{
Expand Down Expand Up @@ -1840,7 +1844,9 @@ func deploymentWithPVCAdded(namespace, name, revision, pvcName string) *unstruct
}
}
},
"status": {}
"status": {
"observedGeneration": 1
}
}`, namespace, name, revision, pvcName))
if err != nil {
panic(err)
Expand Down Expand Up @@ -1903,6 +1909,7 @@ func deploymentWithPVCProgressing(namespace, name, revision, pvcName string) *un
}
},
"status": {
"observedGeneration": 1,
"conditions": [
{
"type": "Progressing",
Expand Down
44 changes: 44 additions & 0 deletions tests/sdk/nodejs/deployment-rollout/step4/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2016-2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as k8s from "@pulumi/kubernetes";

export const namespace = new k8s.core.v1.Namespace("test-namespace");

const appLabels = { app: "nginx" };
new k8s.apps.v1.Deployment("nginx", {
metadata: {
namespace: namespace.metadata.name,
annotations: {
"pulumi.com/timeoutSeconds": "30",
},
},
spec: {
selector: { matchLabels: appLabels },
replicas: 1,
template: {
metadata: { labels: appLabels },
spec: {
containers: [
{
name: "nginx",
image: "nginx:fake", // Should trigger a failure on await.
ports: [{ containerPort: 80 }],
},
],
schedulerName: "default-scheduler",
},
},
},
});
7 changes: 7 additions & 0 deletions tests/sdk/nodejs/nodejs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,13 @@ func TestDeploymentRollout(t *testing.T) {
assert.Equal(t, image.(string), "nginx:stable")
},
},
{
// This is a deployment spec update that should cause a failure on await.
// https://github.com/pulumi/pulumi-kubernetes/issues/2941
Dir: filepath.Join("deployment-rollout", "step4"),
Additive: true,
ExpectFailure: true,
},
},
})
integration.ProgramTest(t, &test)
Expand Down

0 comments on commit c8150eb

Please sign in to comment.