diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index fe9247265f..6b36dc05db 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -525,8 +525,7 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform return opts } -func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (*manager.Manager, error) { - +func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error) { _, kubecfg, err := utils.GetKubeConfig(ctx, cfg) if err != nil { return nil, errors.Wrapf(err, "error building Kubernetes Clientset") @@ -536,22 +535,23 @@ func CreateControllerManager(ctx context.Context, cfg *config.Config, options ma if err != nil { return nil, errors.Wrapf(err, "failed to initialize controller-runtime manager") } - return &mgr, nil + + return mgr, nil } // StartControllerManager Start controller runtime manager to start listening to resource changes. // K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer // EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect // workflow changes faster than the default sync interval for workflow CRDs. -func StartControllerManager(ctx context.Context, mgr *manager.Manager) error { +func StartControllerManager(ctx context.Context, mgr manager.Manager) error { ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager") pprof.SetGoroutineLabels(ctx) logger.Infof(ctx, "Starting controller-runtime manager") - return (*mgr).Start(ctx) + return mgr.Start(ctx) } // StartController creates a new FlytePropeller Controller and starts it -func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr *manager.Manager, scope *promutils.Scope) error { +func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error { // Setup cancel on the context ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -591,7 +591,7 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s informerFactory := k8sInformers.NewSharedInformerFactoryWithOptions(kubeClient, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateResync.Duration) - c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, *mgr, *scope) + c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, mgr, *scope) if err != nil { return errors.Wrap(err, "failed to start FlytePropeller") } else if c == nil { diff --git a/flytepropeller/pkg/webhook/entrypoint.go b/flytepropeller/pkg/webhook/entrypoint.go index 466cbbde33..ad2c21d6a1 100644 --- a/flytepropeller/pkg/webhook/entrypoint.go +++ b/flytepropeller/pkg/webhook/entrypoint.go @@ -5,11 +5,10 @@ import ( "encoding/json" errors2 "errors" "fmt" - "os" - "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "os" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" @@ -25,7 +24,7 @@ const ( ) func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config, - defaultNamespace string, scope *promutils.Scope, mgr *manager.Manager) error { + defaultNamespace string, scope *promutils.Scope, mgr manager.Manager) error { raw, err := json.Marshal(cfg) if err != nil { return err @@ -40,7 +39,7 @@ func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config, webhookScope := (*scope).NewSubScope("webhook") - secretsWebhook := NewPodMutator(cfg, webhookScope) + secretsWebhook := NewPodMutator(cfg, mgr.GetScheme(), webhookScope) // Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created. err = createMutationConfig(ctx, kubeClient, secretsWebhook, defaultNamespace) @@ -48,7 +47,7 @@ func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config, return err } - err = secretsWebhook.Register(ctx, *mgr) + err = secretsWebhook.Register(ctx, mgr) if err != nil { logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err) } diff --git a/flytepropeller/pkg/webhook/pod.go b/flytepropeller/pkg/webhook/pod.go index 556b6053d9..31fca8f9c7 100644 --- a/flytepropeller/pkg/webhook/pod.go +++ b/flytepropeller/pkg/webhook/pod.go @@ -31,6 +31,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "k8s.io/apimachinery/pkg/runtime" "net/http" "os" "path/filepath" @@ -40,7 +41,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -70,17 +70,7 @@ type Mutator interface { Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Pod, changed bool, err error) } -func (pm *PodMutator) InjectClient(_ client.Client) error { - return nil -} - -// InjectDecoder injects the decoder into a mutatingHandler. -func (pm *PodMutator) InjectDecoder(d *admission.Decoder) error { - pm.decoder = d - return nil -} - -func (pm *PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response { +func (pm PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response { // Get the object in the request obj := &corev1.Pod{} err := pm.decoder.Decode(request, obj) @@ -132,7 +122,7 @@ func (pm PodMutator) Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Po return newP, changed, nil } -func (pm *PodMutator) Register(ctx context.Context, mgr manager.Manager) error { +func (pm PodMutator) Register(ctx context.Context, mgr manager.Manager) error { wh := &admission.Webhook{ Handler: pm, } @@ -220,9 +210,10 @@ func (pm PodMutator) CreateMutationWebhookConfiguration(namespace string) (*admi return mutateConfig, nil } -func NewPodMutator(cfg *config.Config, scope promutils.Scope) *PodMutator { +func NewPodMutator(cfg *config.Config, scheme *runtime.Scheme, scope promutils.Scope) *PodMutator { return &PodMutator{ - cfg: cfg, + decoder: admission.NewDecoder(scheme), + cfg: cfg, Mutators: []MutatorConfig{ { Mutator: NewSecretsMutator(cfg, scope.NewSubScope("secrets")), diff --git a/flytepropeller/pkg/webhook/pod_test.go b/flytepropeller/pkg/webhook/pod_test.go index 122f66c82f..53d8cdee7e 100644 --- a/flytepropeller/pkg/webhook/pod_test.go +++ b/flytepropeller/pkg/webhook/pod_test.go @@ -85,7 +85,7 @@ func Test_CreateMutationWebhookConfiguration(t *testing.T) { pm := NewPodMutator(&config.Config{ CertDir: "testdata", ServiceName: "my-service", - }, promutils.NewTestScope()) + }, latest.Scheme, promutils.NewTestScope()) t.Run("Empty namespace", func(t *testing.T) { c, err := pm.CreateMutationWebhookConfiguration("") @@ -104,10 +104,7 @@ func Test_Handle(t *testing.T) { pm := NewPodMutator(&config.Config{ CertDir: "testdata", ServiceName: "my-service", - }, promutils.NewTestScope()) - - decoder := admission.NewDecoder(latest.Scheme) - assert.NoError(t, pm.InjectDecoder(decoder)) + }, latest.Scheme, promutils.NewTestScope()) req := admission.Request{ AdmissionRequest: admissionv1.AdmissionRequest{