Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement graceful error handling in operator package #1683

Merged
merged 11 commits into from
Sep 27, 2024
95 changes: 65 additions & 30 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package operator

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/pprof"
Expand All @@ -33,15 +34,18 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -166,39 +170,25 @@ func NewOperator() (context.Context, *Operator) {
}
mgr, err := ctrl.NewManager(config, mgrOpts)
mgr = lo.Must(mgr, err, "failed to setup manager")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*corev1.Pod).Spec.NodeName}
}), "failed to setup pod indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Node{}, "spec.providerID", func(o client.Object) []string {
return []string{o.(*corev1.Node).Spec.ProviderID}
}), "failed to setup node provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.group", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Group}
}), "failed to setup nodeclaim nodeclassref apiversion indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.kind", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Kind}
}), "failed to setup nodeclaim nodeclassref kind indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Name}
}), "failed to setup nodeclaim nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.group", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Group}
}), "failed to setup nodepool nodeclassref apiversion indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.kind", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Kind}
}), "failed to setup nodepool nodeclassref kind indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name}
}), "failed to setup nodepool nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

setupIndexers(ctx, mgr)

lo.Must0(mgr.AddReadyzCheck("manager", func(req *http.Request) error {
return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches"))
}))
lo.Must0(mgr.AddReadyzCheck("crd", func(_ *http.Request) error {
objects := []client.Object{&v1.NodePool{}, &v1.NodeClaim{}}
for _, obj := range objects {
gvk, err := apiutil.GVKForObject(obj, scheme.Scheme)
if err != nil {
return err
}
if _, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
return err
}
}
return nil
}))
lo.Must0(mgr.AddHealthzCheck("healthz", healthz.Ping))
lo.Must0(mgr.AddReadyzCheck("readyz", healthz.Ping))

Expand Down Expand Up @@ -226,3 +216,48 @@ func (o *Operator) Start(ctx context.Context) {
}()
wg.Wait()
}

func setupIndexers(ctx context.Context, mgr manager.Manager) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Pod{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*corev1.Pod).Spec.NodeName}
}), "failed to setup pod indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &corev1.Node{}, "spec.providerID", func(o client.Object) []string {
return []string{o.(*corev1.Node).Spec.ProviderID}
}), "failed to setup node provider id indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

// If the CRD does not exist, we should fail open when setting up indexers. This ensures controllers that aren't reliant on those CRDs may continue to function
handleCRDIndexerError := func(err error, msg string) {
noKindMatchError := &meta.NoKindMatchError{}
if errors.As(err, &noKindMatchError) {
log.FromContext(ctx).Error(err, msg)
} else if err != nil {
// lo.Must0 also does a panic
panic(fmt.Sprintf("%s, %s", err, msg))
}
}
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "status.providerID", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Status.ProviderID}
}), "failed to setup nodeclaim provider id indexer")
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.group", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Group}
}), "failed to setup nodeclaim nodeclassref apiversion indexer")
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.kind", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Kind}
}), "failed to setup nodeclaim nodeclassref kind indexer")
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodeClaim{}, "spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodeClaim).Spec.NodeClassRef.Name}
}), "failed to setup nodeclaim nodeclassref name indexer")

handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.group", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Group}
}), "failed to setup nodepool nodeclassref apiversion indexer")
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.kind", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Kind}
}), "failed to setup nodepool nodeclassref kind indexer")
handleCRDIndexerError(mgr.GetFieldIndexer().IndexField(ctx, &v1.NodePool{}, "spec.template.spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1.NodePool).Spec.Template.Spec.NodeClassRef.Name}
}), "failed to setup nodepool nodeclassref name indexer")
}
Loading