Skip to content

Commit

Permalink
Merge branch 'master' into flyte-start-errors
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario authored Nov 30, 2023
2 parents e615605 + 0541851 commit c2b90e1
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 32 deletions.
14 changes: 7 additions & 7 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,7 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform
return opts
}

func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (*manager.Manager, error) {

func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error) {
_, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
return nil, errors.Wrapf(err, "error building Kubernetes Clientset")
Expand All @@ -536,22 +535,23 @@ func CreateControllerManager(ctx context.Context, cfg *config.Config, options ma
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize controller-runtime manager")
}
return &mgr, nil

return mgr, nil
}

// StartControllerManager Start controller runtime manager to start listening to resource changes.
// K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer
// EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect
// workflow changes faster than the default sync interval for workflow CRDs.
func StartControllerManager(ctx context.Context, mgr *manager.Manager) error {
func StartControllerManager(ctx context.Context, mgr manager.Manager) error {
ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager")
pprof.SetGoroutineLabels(ctx)
logger.Infof(ctx, "Starting controller-runtime manager")
return (*mgr).Start(ctx)
return mgr.Start(ctx)
}

// StartController creates a new FlytePropeller Controller and starts it
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr *manager.Manager, scope *promutils.Scope) error {
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error {
// Setup cancel on the context
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -591,7 +591,7 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s

informerFactory := k8sInformers.NewSharedInformerFactoryWithOptions(kubeClient, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateResync.Duration)

c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, *mgr, *scope)
c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, mgr, *scope)
if err != nil {
return errors.Wrap(err, "failed to start FlytePropeller")
} else if c == nil {
Expand Down
9 changes: 4 additions & 5 deletions flytepropeller/pkg/webhook/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/json"
errors2 "errors"
"fmt"
"os"

"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"os"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
Expand All @@ -25,7 +24,7 @@ const (
)

func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config,
defaultNamespace string, scope *promutils.Scope, mgr *manager.Manager) error {
defaultNamespace string, scope *promutils.Scope, mgr manager.Manager) error {
raw, err := json.Marshal(cfg)
if err != nil {
return err
Expand All @@ -40,15 +39,15 @@ func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config,

webhookScope := (*scope).NewSubScope("webhook")

secretsWebhook := NewPodMutator(cfg, webhookScope)
secretsWebhook := NewPodMutator(cfg, mgr.GetScheme(), webhookScope)

// Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created.
err = createMutationConfig(ctx, kubeClient, secretsWebhook, defaultNamespace)
if err != nil {
return err
}

err = secretsWebhook.Register(ctx, *mgr)
err = secretsWebhook.Register(ctx, mgr)
if err != nil {
logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err)
}
Expand Down
21 changes: 6 additions & 15 deletions flytepropeller/pkg/webhook/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"net/http"
"os"
"path/filepath"
Expand All @@ -39,7 +40,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

Expand Down Expand Up @@ -69,17 +69,7 @@ type Mutator interface {
Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Pod, changed bool, err error)
}

func (pm *PodMutator) InjectClient(_ client.Client) error {
return nil
}

// InjectDecoder injects the decoder into a mutatingHandler.
func (pm *PodMutator) InjectDecoder(d *admission.Decoder) error {
pm.decoder = d
return nil
}

func (pm *PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response {
func (pm PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response {
// Get the object in the request
obj := &corev1.Pod{}
err := pm.decoder.Decode(request, obj)
Expand Down Expand Up @@ -131,7 +121,7 @@ func (pm PodMutator) Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Po
return newP, changed, nil
}

func (pm *PodMutator) Register(ctx context.Context, mgr manager.Manager) error {
func (pm PodMutator) Register(ctx context.Context, mgr manager.Manager) error {
wh := &admission.Webhook{
Handler: pm,
}
Expand Down Expand Up @@ -219,9 +209,10 @@ func (pm PodMutator) CreateMutationWebhookConfiguration(namespace string) (*admi
return mutateConfig, nil
}

func NewPodMutator(cfg *config.Config, scope promutils.Scope) *PodMutator {
func NewPodMutator(cfg *config.Config, scheme *runtime.Scheme, scope promutils.Scope) *PodMutator {
return &PodMutator{
cfg: cfg,
decoder: admission.NewDecoder(scheme),
cfg: cfg,
Mutators: []MutatorConfig{
{
Mutator: NewSecretsMutator(cfg, scope.NewSubScope("secrets")),
Expand Down
7 changes: 2 additions & 5 deletions flytepropeller/pkg/webhook/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_CreateMutationWebhookConfiguration(t *testing.T) {
pm := NewPodMutator(&config.Config{
CertDir: "testdata",
ServiceName: "my-service",
}, promutils.NewTestScope())
}, latest.Scheme, promutils.NewTestScope())

t.Run("Empty namespace", func(t *testing.T) {
c, err := pm.CreateMutationWebhookConfiguration("")
Expand All @@ -104,10 +104,7 @@ func Test_Handle(t *testing.T) {
pm := NewPodMutator(&config.Config{
CertDir: "testdata",
ServiceName: "my-service",
}, promutils.NewTestScope())

decoder := admission.NewDecoder(latest.Scheme)
assert.NoError(t, pm.InjectDecoder(decoder))
}, latest.Scheme, promutils.NewTestScope())

req := admission.Request{
AdmissionRequest: admissionv1.AdmissionRequest{
Expand Down

0 comments on commit c2b90e1

Please sign in to comment.