Skip to content

Commit

Permalink
Add Versioned Hash
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Feb 20, 2024
1 parent 15bb262 commit f704024
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 40 deletions.
1 change: 1 addition & 0 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ func main() {
state.NewCluster(op.Clock, op.GetClient(), cloudProvider),
op.EventRecorder,
cloudProvider,
op.ApiextensionInterface,
)...).Start(ctx)
}
26 changes: 25 additions & 1 deletion pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sort"
"strconv"

"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

"github.com/mitchellh/hashstructure/v2"
"github.com/robfig/cron/v3"
"github.com/samber/lo"
Expand Down Expand Up @@ -189,14 +191,36 @@ type NodePool struct {
Status NodePoolStatus `json:"status,omitempty"`
}

func (in *NodePool) Hash() string {
func nodePoolHashVersion(ctx context.Context, apiInterface clientset.Interface) (string, error) {
npcrd, err := apiInterface.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, "nodepools.karpenter.sh", metav1.GetOptions{})
if err != nil {
return "", err
}

return fmt.Sprint(lo.Must(hashstructure.Hash(npcrd.Spec.Versions[0].Schema.OpenAPIV3Schema.Properties["spec"].Properties["template"], hashstructure.FormatV2, &hashstructure.HashOptions{
SlicesAsSets: true,
IgnoreZeroValue: true,
ZeroNil: true,
}))), nil
}

func (in *NodePool) nodePoolHash() string {
return fmt.Sprint(lo.Must(hashstructure.Hash(in.Spec.Template, hashstructure.FormatV2, &hashstructure.HashOptions{
SlicesAsSets: true,
IgnoreZeroValue: true,
ZeroNil: true,
})))
}

func (in *NodePool) Hash(ctx context.Context, apiextensionsInterface clientset.Interface) (string, error) {
nphv, err := nodePoolHashVersion(ctx, apiextensionsInterface)
if err != nil {
return "", err
}

return fmt.Sprintf("%s-%s", nphv, in.nodePoolHash()), nil
}

// NodePoolList contains a list of NodePool
// +kubebuilder:object:root=true
type NodePoolList struct {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand Down Expand Up @@ -49,6 +51,7 @@ func NewControllers(
cluster *state.Cluster,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
apiextensionsInterface clientset.Interface,
) []controller.Controller {

p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
Expand All @@ -60,7 +63,7 @@ func NewControllers(
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewPodController(kubeClient, p, recorder),
provisioning.NewNodeController(kubeClient, p, recorder),
nodepoolhash.NewController(kubeClient),
nodepoolhash.NewController(kubeClient, apiextensionsInterface),
informer.NewDaemonSetController(kubeClient, cluster),
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
Expand Down
10 changes: 8 additions & 2 deletions pkg/controllers/nodeclaim/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disruption
import (
"context"
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -115,15 +116,20 @@ func (d *Drift) isDrifted(ctx context.Context, nodePool *v1beta1.NodePool, nodeC
return driftedReason, nil
}

// Eligible fields for static drift are described in the docs
// Eligible fields for drift are described in the docs
// https://karpenter.sh/docs/concepts/deprovisioning/#drift
func areStaticFieldsDrifted(nodePool *v1beta1.NodePool, nodeClaim *v1beta1.NodeClaim) cloudprovider.DriftReason {
nodePoolHash, foundHashNodePool := nodePool.Annotations[v1beta1.NodePoolHashAnnotationKey]
nodeClaimHash, foundHashNodeClaim := nodeClaim.Annotations[v1beta1.NodePoolHashAnnotationKey]
if !foundHashNodePool || !foundHashNodeClaim {
return ""
}
return lo.Ternary(nodePoolHash != nodeClaimHash, NodePoolDrifted, "")
// validate that the version of the crd is the same
nodePoolHashSlice, nodeClaimHashSlice := strings.Split(nodePoolHash, "-"), strings.Split(nodeClaimHash, "-")
if len(nodePoolHashSlice) < 1 || len(nodeClaimHashSlice) < 1 || nodePoolHashSlice[0] != nodeClaimHashSlice[0] {
return ""
}
return lo.Ternary(nodePoolHashSlice[1] != nodeClaimHashSlice[1], NodePoolDrifted, "")
}

func areRequirementsDrifted(nodePool *v1beta1.NodePool, nodeClaim *v1beta1.NodeClaim) cloudprovider.DriftReason {
Expand Down
21 changes: 16 additions & 5 deletions pkg/controllers/nodeclaim/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Drift", func() {
v1.LabelInstanceTypeStable: test.RandomName(),
},
Annotations: map[string]string{
v1beta1.NodePoolHashAnnotationKey: nodePool.Hash(),
v1beta1.NodePoolHashAnnotationKey: "test-123",
},
},
})
Expand Down Expand Up @@ -122,7 +122,7 @@ var _ = Describe("Drift", func() {
It("should detect static drift before cloud provider drift", func() {
cp.Drifted = "drifted"
nodePool.Annotations = lo.Assign(nodePool.Annotations, map[string]string{
v1beta1.NodePoolHashAnnotationKey: "123456789",
v1beta1.NodePoolHashAnnotationKey: "test-123456789",
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectReconcileSucceeded(ctx, nodeClaimDisruptionController, client.ObjectKeyFromObject(nodeClaim))
Expand Down Expand Up @@ -372,6 +372,8 @@ var _ = Describe("Drift", func() {
v1beta1.CapacityTypeLabelKey: v1beta1.CapacityTypeOnDemand,
v1.LabelOSStable: string(v1.Linux),
})
hash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
nodeClaimTwo, _ := test.NodeClaimAndNode(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -381,7 +383,7 @@ var _ = Describe("Drift", func() {
v1.LabelOSStable: string(v1.Windows),
},
Annotations: map[string]string{
v1beta1.NodePoolHashAnnotationKey: nodePool.Hash(),
v1beta1.NodePoolHashAnnotationKey: hash,
},
},
Status: v1beta1.NodeClaimStatus{
Expand Down Expand Up @@ -420,7 +422,7 @@ var _ = Describe("Drift", func() {
var nodePoolController controller.Controller
BeforeEach(func() {
cp.Drifted = ""
nodePoolController = hash.NewController(env.Client)
nodePoolController = hash.NewController(env.Client, env.ApiextensionInterface)
nodePoolOptions = v1beta1.NodePool{
ObjectMeta: nodePool.ObjectMeta,
Spec: v1beta1.NodePoolSpec{
Expand Down Expand Up @@ -455,7 +457,9 @@ var _ = Describe("Drift", func() {
},
},
}
nodeClaim.ObjectMeta.Annotations[v1beta1.NodePoolHashAnnotationKey] = nodePool.Hash()
hash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
nodeClaim.ObjectMeta.Annotations[v1beta1.NodePoolHashAnnotationKey] = hash
})
It("should detect drift on changes for all static fields", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
Expand Down Expand Up @@ -495,5 +499,12 @@ var _ = Describe("Drift", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().GetCondition(v1beta1.Drifted)).To(BeNil())
})
It("should not return drifted if karpenter.sh/nodePool-hash annotation version does not match the nodeClaim", func() {
nodeClaim.ObjectMeta.Annotations = map[string]string{v1beta1.NodePoolHashAnnotationKey: "test-hash"}
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectReconcileSucceeded(ctx, nodeClaimDisruptionController, client.ObjectKeyFromObject(nodeClaim))
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().GetCondition(v1beta1.Drifted)).To(BeNil())
})
})
})
52 changes: 48 additions & 4 deletions pkg/controllers/nodepool/hash/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package hash

import (
"context"
"strings"

"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
operatorcontroller "sigs.k8s.io/karpenter/pkg/operator/controller"
)
Expand All @@ -36,19 +40,30 @@ var _ operatorcontroller.TypedController[*v1beta1.NodePool] = (*Controller)(nil)
// Controller is hash controller that constructs a hash based on the fields that are considered for static drift.
// The hash is placed in the metadata for increased observability and should be found on each object.
type Controller struct {
kubeClient client.Client
kubeClient client.Client
apiextensionsInterface clientset.Interface
}

func NewController(kubeClient client.Client) operatorcontroller.Controller {
func NewController(kubeClient client.Client, apiextensionsInterface clientset.Interface) operatorcontroller.Controller {
return operatorcontroller.Typed[*v1beta1.NodePool](kubeClient, &Controller{
kubeClient: kubeClient,
kubeClient: kubeClient,
apiextensionsInterface: apiextensionsInterface,
})
}

// Reconcile the resource
func (c *Controller) Reconcile(ctx context.Context, np *v1beta1.NodePool) (reconcile.Result, error) {
stored := np.DeepCopy()
np.Annotations = lo.Assign(np.Annotations, map[string]string{v1beta1.NodePoolHashAnnotationKey: np.Hash()})
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("nodepool", np.Name))

newHash, err := np.Hash(ctx, c.apiextensionsInterface)
if err != nil {
return reconcile.Result{}, err
}
np.Annotations = lo.Assign(np.Annotations, map[string]string{v1beta1.NodePoolHashAnnotationKey: newHash})
if err := c.updateNodeClaimHash(ctx, np, newHash); err != nil {
return reconcile.Result{}, err
}

if !equality.Semantic.DeepEqual(stored, np) {
if err := c.kubeClient.Patch(ctx, np, client.MergeFrom(stored)); err != nil {
Expand All @@ -69,3 +84,32 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontr
WithOptions(controller.Options{MaxConcurrentReconciles: 10}),
)
}

func (c *Controller) updateNodeClaimHash(ctx context.Context, np *v1beta1.NodePool, nphash string) error {
ncList := &v1beta1.NodeClaimList{}
if err := c.kubeClient.List(ctx, ncList); err != nil {
return client.IgnoreNotFound(err)
}

for i := range ncList.Items {
stored := ncList.Items[i].DeepCopy()
if ncList.Items[i].Labels[v1beta1.ManagedByAnnotationKey] != np.Name {
continue
}

if ncHash, ok := ncList.Items[i].Annotations[v1beta1.NodePoolHashAnnotationKey]; ok && ncList.Items[i].StatusConditions().GetCondition(v1beta1.Drifted) != nil {
ncVersion, npVersion := strings.Split(ncHash, "-")[0], strings.Split(np.Annotations[v1beta1.NodePoolHashAnnotationKey], "-")[0]
if ncVersion != npVersion {
ncList.Items[i].Annotations = lo.Assign(ncList.Items[i].Annotations, map[string]string{v1beta1.NodePoolHashAnnotationKey: nphash})
if !equality.Semantic.DeepEqual(stored, ncList.Items[i]) {
if err := c.kubeClient.Patch(ctx, &ncList.Items[i], client.MergeFrom(stored)); err != nil {
return client.IgnoreNotFound(err)
}
}
}
}

}

return nil
}
63 changes: 55 additions & 8 deletions pkg/controllers/nodepool/hash/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (

. "sigs.k8s.io/karpenter/pkg/test/expectations"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/karpenter/pkg/apis"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
Expand All @@ -52,7 +54,7 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
nodePoolController = hash.NewController(env.Client)
nodePoolController = hash.NewController(env.Client, env.ApiextensionInterface)
})

var _ = AfterSuite(func() {
Expand All @@ -61,6 +63,7 @@ var _ = AfterSuite(func() {

var _ = Describe("Static Drift Hash", func() {
var nodePool *v1beta1.NodePool
var nodeClaimOne *v1beta1.NodeClaim
BeforeEach(func() {
nodePool = test.NodePool(v1beta1.NodePool{
Spec: v1beta1.NodePoolSpec{
Expand Down Expand Up @@ -94,30 +97,49 @@ var _ = Describe("Static Drift Hash", func() {
},
},
})
hash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
nodeClaimOne = test.NodeClaim(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{v1beta1.NodePoolLabelKey: nodePool.Name},
Annotations: map[string]string{v1beta1.NodePoolHashAnnotationKey: hash},
},
})
})
It("should update the static drift hash when NodePool static field is updated", func() {
ExpectApplied(ctx, env.Client, nodePool)
// Need to Update testing here
It("should update the drift hash when NodePool static field is updated", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaimOne)
ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

expectedHash := nodePool.Hash()
expectedHash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))

nodePool.Spec.Template.Labels = map[string]string{"keyLabeltest": "valueLabeltest"}
nodePool.Spec.Template.Annotations = map[string]string{"keyAnnotation2": "valueAnnotation2", "keyAnnotation": "valueAnnotation"}
ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

expectedHashTwo := nodePool.Hash()
expectedHashTwo, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHashTwo))
// Expect NodeClaims not to have been updated
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
})
It("should not update the static drift hash when NodePool behavior field is updated", func() {
ExpectApplied(ctx, env.Client, nodePool)
It("should not update the drift hash when NodePool behavior field is updated", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaimOne)
ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

expectedHash := nodePool.Hash()
expectedHash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))

nodePool.Spec.Limits = v1beta1.Limits(v1.ResourceList{"cpu": resource.MustParse("16")})
nodePool.Spec.Disruption.ConsolidationPolicy = v1beta1.ConsolidationPolicyWhenEmpty
Expand All @@ -131,7 +153,32 @@ var _ = Describe("Static Drift Hash", func() {
nodePool.Spec.Weight = lo.ToPtr(int32(80))
ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
// Expect NodeClaims not to have been updated
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
})
It("should update NodePool hash on all NodeClaims when the hash versions don't match", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaimOne)
ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

expectedHash, err := nodePool.Hash(ctx, env.ApiextensionInterface)
Expect(err).To(BeNil())
Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))

nodePool.Annotations[v1beta1.NodePoolHashAnnotationKey] = "testversion-nodepoolhash"
nodeClaimOne.Annotations[v1beta1.NodePoolHashAnnotationKey] = "testversion-nodepoolhash"

ExpectReconcileSucceeded(ctx, nodePoolController, client.ObjectKeyFromObject(nodePool))
nodePool = ExpectExists(ctx, env.Client, nodePool)
nodeClaimOne = ExpectExists(ctx, env.Client, nodeClaimOne)

Expect(nodePool.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
// Expect NodeClaims to have been updated to the original hash
Expect(nodeClaimOne.Annotations).To(HaveKeyWithValue(v1beta1.NodePoolHashAnnotationKey, expectedHash))
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *NodeClaimTemplate) ToNodeClaim(nodePool *v1beta1.NodePool) *v1beta1.Nod
nc := &v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", i.NodePoolName),
Annotations: lo.Assign(i.Annotations, map[string]string{v1beta1.NodePoolHashAnnotationKey: nodePool.Hash()}),
Annotations: lo.Assign(i.Annotations, map[string]string{v1beta1.NodePoolHashAnnotationKey: nodePool.Annotations[v1beta1.NodePoolHashAnnotationKey]}),
Labels: i.Labels,
OwnerReferences: []metav1.OwnerReference{
{
Expand Down
Loading

0 comments on commit f704024

Please sign in to comment.