Skip to content

Commit

Permalink
Merge branch 'master' of github.com:flyteorg/flyte into failure_node
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw committed Dec 1, 2023
2 parents 4c09a50 + a8f4904 commit d6b4b1d
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 67 deletions.
16 changes: 12 additions & 4 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package single
import (
"context"
"net/http"
"os"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"

Expand Down Expand Up @@ -40,6 +41,7 @@ import (
)

const defaultNamespace = "all"
const propellerDefaultNamespace = "flyte"

func startDataCatalog(ctx context.Context, _ DataCatalog) error {
if err := datacatalogRepo.Migrate(ctx); err != nil {
Expand Down Expand Up @@ -120,7 +122,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: func (config *rest.Config, options cache.Options) (cache.Cache, error) {
NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
Expand All @@ -141,7 +143,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
BindAddress: "0",
},
WebhookServer: ctrlWebhook.NewServer(ctrlWebhook.Options{
CertDir: webhookConfig.GetConfig().CertDir,
CertDir: webhookConfig.GetConfig().ExpandCertDir(),
Port: webhookConfig.GetConfig().ListenPort,
}),
}
Expand All @@ -162,7 +164,13 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
return err
}
logger.Infof(childCtx, "Starting Webhook server...")
return webhookEntrypoint.Run(signals.SetupSignalHandler(childCtx), propellerCfg, webhookConfig.GetConfig(), defaultNamespace, &propellerScope, mgr)
// set default namespace for pod template store
podNamespace, found := os.LookupEnv(webhookEntrypoint.PodNamespaceEnvVar)
if !found {
podNamespace = propellerDefaultNamespace
}

return webhookEntrypoint.Run(signals.SetupSignalHandler(childCtx), propellerCfg, webhookConfig.GetConfig(), podNamespace, &propellerScope, mgr)
})
}

Expand Down Expand Up @@ -207,7 +215,7 @@ var startCmd = &cobra.Command{
for _, serviceName := range []string{otelutils.AdminClientTracer, otelutils.AdminGormTracer, otelutils.AdminServerTracer,
otelutils.BlobstoreClientTracer, otelutils.DataCatalogClientTracer, otelutils.DataCatalogGormTracer,
otelutils.DataCatalogServerTracer, otelutils.FlytePropellerTracer, otelutils.K8sClientTracer} {
if err := otelutils.RegisterTracerProvider(serviceName, otelutils.GetConfig()) ; err != nil {
if err := otelutils.RegisterTracerProvider(serviceName, otelutils.GetConfig()); err != nil {
logger.Errorf(ctx, "Failed to create otel tracer provider. %v", err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/runtime/cluster_resource_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runtime

import (
"os"
"time"

"github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
Expand All @@ -21,7 +22,7 @@ var clusterResourceConfig = config.MustRegisterSection(clusterResourceKey, &inte
type ClusterResourceConfigurationProvider struct{}

func (p *ClusterResourceConfigurationProvider) GetTemplatePath() string {
return clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).TemplatePath
return os.ExpandEnv(clusterResourceConfig.GetConfig().(*interfaces.ClusterResourceConfig).TemplatePath)
}

func (p *ClusterResourceConfigurationProvider) GetTemplateData() interfaces.TemplateData {
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
BindAddress: "0",
},
WebhookServer: ctrlWebhook.NewServer(ctrlWebhook.Options{
CertDir: cfg.CertDir,
CertDir: cfg.ExpandCertDir(),
Port: cfg.ListenPort,
}),
}
Expand Down
14 changes: 7 additions & 7 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,7 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform
return opts
}

func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (*manager.Manager, error) {

func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error) {
_, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
return nil, errors.Wrapf(err, "error building Kubernetes Clientset")
Expand All @@ -536,22 +535,23 @@ func CreateControllerManager(ctx context.Context, cfg *config.Config, options ma
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize controller-runtime manager")
}
return &mgr, nil

return mgr, nil
}

// StartControllerManager Start controller runtime manager to start listening to resource changes.
// K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer
// EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect
// workflow changes faster than the default sync interval for workflow CRDs.
func StartControllerManager(ctx context.Context, mgr *manager.Manager) error {
func StartControllerManager(ctx context.Context, mgr manager.Manager) error {
ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager")
pprof.SetGoroutineLabels(ctx)
logger.Infof(ctx, "Starting controller-runtime manager")
return (*mgr).Start(ctx)
return mgr.Start(ctx)
}

// StartController creates a new FlytePropeller Controller and starts it
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr *manager.Manager, scope *promutils.Scope) error {
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error {
// Setup cancel on the context
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -591,7 +591,7 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s

informerFactory := k8sInformers.NewSharedInformerFactoryWithOptions(kubeClient, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateResync.Duration)

c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, *mgr, *scope)
c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, mgr, *scope)
if err != nil {
return errors.Wrap(err, "failed to start FlytePropeller")
} else if c == nil {
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/webhook/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"os"

"github.com/flyteorg/flyte/flytestdlib/config"
)
Expand Down Expand Up @@ -103,6 +104,10 @@ type Config struct {
VaultSecretManagerConfig VaultSecretManagerConfig `json:"vaultSecretManager" pflag:",Vault Secret Manager config."`
}

func (c Config) ExpandCertDir() string {
return os.ExpandEnv(c.CertDir)
}

type AWSSecretManagerConfig struct {
SidecarImage string `json:"sidecarImage" pflag:",Specifies the sidecar docker image to use"`
Resources corev1.ResourceRequirements `json:"resources" pflag:"-,Specifies resource requirements for the init container."`
Expand Down
9 changes: 4 additions & 5 deletions flytepropeller/pkg/webhook/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/json"
errors2 "errors"
"fmt"
"os"

"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"os"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
Expand All @@ -25,7 +24,7 @@ const (
)

func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config,
defaultNamespace string, scope *promutils.Scope, mgr *manager.Manager) error {
defaultNamespace string, scope *promutils.Scope, mgr manager.Manager) error {
raw, err := json.Marshal(cfg)
if err != nil {
return err
Expand All @@ -40,15 +39,15 @@ func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config,

webhookScope := (*scope).NewSubScope("webhook")

secretsWebhook := NewPodMutator(cfg, webhookScope)
secretsWebhook := NewPodMutator(cfg, mgr.GetScheme(), webhookScope)

// Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created.
err = createMutationConfig(ctx, kubeClient, secretsWebhook, defaultNamespace)
if err != nil {
return err
}

err = secretsWebhook.Register(ctx, *mgr)
err = secretsWebhook.Register(ctx, mgr)
if err != nil {
logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions flytepropeller/pkg/webhook/init_cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,22 @@ func createWebhookSecret(ctx context.Context, namespace string, cfg *webhookConf
}

if cfg.LocalCert {
if _, err := os.Stat(cfg.CertDir); os.IsNotExist(err) {
if err := os.Mkdir(cfg.CertDir, folderPerm); err != nil {
certPath := cfg.ExpandCertDir()
if _, err := os.Stat(certPath); os.IsNotExist(err) {
if err := os.Mkdir(certPath, folderPerm); err != nil {
return err
}
}

if err := os.WriteFile(path.Join(cfg.CertDir, CaCertKey), certs.CaPEM.Bytes(), permission); err != nil {
if err := os.WriteFile(path.Join(certPath, CaCertKey), certs.CaPEM.Bytes(), permission); err != nil {
return err
}

if err := os.WriteFile(path.Join(cfg.CertDir, ServerCertKey), certs.ServerPEM.Bytes(), permission); err != nil {
if err := os.WriteFile(path.Join(certPath, ServerCertKey), certs.ServerPEM.Bytes(), permission); err != nil {
return err
}

if err := os.WriteFile(path.Join(cfg.CertDir, ServerCertPrivateKey), certs.PrivateKeyPEM.Bytes(), permission); err != nil {
if err := os.WriteFile(path.Join(certPath, ServerCertPrivateKey), certs.PrivateKeyPEM.Bytes(), permission); err != nil {
return err
}
}
Expand Down
37 changes: 14 additions & 23 deletions flytepropeller/pkg/webhook/pod.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
// The PodMutator is a controller-runtime webhook that intercepts Pod Creation events and mutates them. Currently, there
// is only one registered Mutator, that's the SecretsMutator. It works as follows:
// Package webhook container PodMutator. It's a controller-runtime webhook that intercepts Pod Creation events and
// mutates them. Currently, there is only one registered Mutator, that's the SecretsMutator. It works as follows:
//
// - The Webhook only works on Pods. If propeller/plugins launch a resource outside of K8s (or in a separate k8s
// - The Webhook only works on Pods. If propeller/plugins launch a resource outside K8s (or in a separate k8s
// cluster), it's the responsibility of the plugin to correctly pass secret injection information.
// - When a k8s-plugin builds a resource, propeller's PluginManager will automatically inject a label `inject-flyte
// -secrets: true` and serialize the secret injection information into the annotations.
// - If a plugin does not use the K8sPlugin interface, it's its responsibility to pass secret injection information.
// - If a k8s plugin creates a CRD that launches other Pods (e.g. Spark/PyTorch... etc.), it's its responsibility to
// make sure the labels/annotations set on the CRD by PluginManager are propagated to those launched Pods. This
// ensures secret injection happens no matter how many levels of indirections there are.
// - The Webhook expects 'inject-flyte-secrets: true' as a label on the Pod. Otherwise it won't listen/observe that pod.
// - The Webhook expects 'inject-flyte-secrets: true' as a label on the Pod. Otherwise, it won't listen/observe that
// pod.
// - Once it intercepts the admission request, it goes over all registered Mutators and invoke them in the order they
// are registered as. If a Mutator fails and it's marked as `required`, the operation will fail and the admission
// are registered as. If a Mutator fails, and it's marked as `required`, the operation will fail and the admission
// will be rejected.
// - The SecretsMutator will attempt to lookup the requested secret from the process environment. If the secret is
// - The SecretsMutator will attempt to look up the requested secret from the process environment. If the secret is
// already mounted, it'll inject it as plain-text into the Pod Spec (Less secure).
// - If it's not found in the environment it'll, instead, fallback to the enabled Secrets Injector (K8s, Confidant,
// Vault... etc.).
Expand All @@ -30,7 +31,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/runtime"
"net/http"
"os"
"path/filepath"
Expand All @@ -40,7 +41,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

Expand Down Expand Up @@ -70,17 +70,7 @@ type Mutator interface {
Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Pod, changed bool, err error)
}

func (pm *PodMutator) InjectClient(_ client.Client) error {
return nil
}

// InjectDecoder injects the decoder into a mutatingHandler.
func (pm *PodMutator) InjectDecoder(d *admission.Decoder) error {
pm.decoder = d
return nil
}

func (pm *PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response {
func (pm PodMutator) Handle(ctx context.Context, request admission.Request) admission.Response {
// Get the object in the request
obj := &corev1.Pod{}
err := pm.decoder.Decode(request, obj)
Expand Down Expand Up @@ -132,7 +122,7 @@ func (pm PodMutator) Mutate(ctx context.Context, p *corev1.Pod) (newP *corev1.Po
return newP, changed, nil
}

func (pm *PodMutator) Register(ctx context.Context, mgr manager.Manager) error {
func (pm PodMutator) Register(ctx context.Context, mgr manager.Manager) error {
wh := &admission.Webhook{
Handler: pm,
}
Expand All @@ -158,7 +148,7 @@ func generateMutatePath(gvk schema.GroupVersionKind) string {
}

func (pm PodMutator) CreateMutationWebhookConfiguration(namespace string) (*admissionregistrationv1.MutatingWebhookConfiguration, error) {
caBytes, err := ioutil.ReadFile(filepath.Join(pm.cfg.CertDir, "ca.crt"))
caBytes, err := os.ReadFile(filepath.Join(pm.cfg.ExpandCertDir(), "ca.crt"))
if err != nil {
// ca.crt is optional. If not provided, API Server will assume the webhook is serving SSL using a certificate
// issued by a known Cert Authority.
Expand Down Expand Up @@ -220,9 +210,10 @@ func (pm PodMutator) CreateMutationWebhookConfiguration(namespace string) (*admi
return mutateConfig, nil
}

func NewPodMutator(cfg *config.Config, scope promutils.Scope) *PodMutator {
func NewPodMutator(cfg *config.Config, scheme *runtime.Scheme, scope promutils.Scope) *PodMutator {
return &PodMutator{
cfg: cfg,
decoder: admission.NewDecoder(scheme),
cfg: cfg,
Mutators: []MutatorConfig{
{
Mutator: NewSecretsMutator(cfg, scope.NewSubScope("secrets")),
Expand Down
7 changes: 2 additions & 5 deletions flytepropeller/pkg/webhook/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func Test_CreateMutationWebhookConfiguration(t *testing.T) {
pm := NewPodMutator(&config.Config{
CertDir: "testdata",
ServiceName: "my-service",
}, promutils.NewTestScope())
}, latest.Scheme, promutils.NewTestScope())

t.Run("Empty namespace", func(t *testing.T) {
c, err := pm.CreateMutationWebhookConfiguration("")
Expand All @@ -104,10 +104,7 @@ func Test_Handle(t *testing.T) {
pm := NewPodMutator(&config.Config{
CertDir: "testdata",
ServiceName: "my-service",
}, promutils.NewTestScope())

decoder := admission.NewDecoder(latest.Scheme)
assert.NoError(t, pm.InjectDecoder(decoder))
}, latest.Scheme, promutils.NewTestScope())

req := admission.Request{
AdmissionRequest: admissionv1.AdmissionRequest{
Expand Down
12 changes: 2 additions & 10 deletions rsts/community/contribute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,7 @@ that integrates all Flyte components into a single binary.
go mod tidy
make compile
# Step3: Edit the config file: ./flyte-single-binary-local.yaml.
# Replace occurrences of $HOME with the actual path of your home directory.
sedi=(-i)
case "$(uname)" in
Darwin*) sedi=(-i "")
esac
sed "${sedi[@]}" -e "s|\$HOME|${HOME}|g" flyte-single-binary-local.yaml
# Step 4: Prepare a namespace template for the cluster resource controller.
# Step3: Prepare a namespace template for the cluster resource controller.
# The configuration file "flyte-single-binary-local.yaml" has an entry named cluster_resources.templatePath.
# This entry needs to direct to a directory containing the templates for the cluster resource controller to use.
# We will now create a simple template that allows the automatic creation of required namespaces for projects.
Expand All @@ -409,7 +401,7 @@ that integrates all Flyte components into a single binary.
metadata:
name: '{{ namespace }}'" > $HOME/.flyte/cluster-resource-templates/namespace.yaml
# Step5: Running the single binary.
# Step4: Running the single binary.
# The POD_NAMESPACE environment variable is necessary for the webhook to function correctly.
# You may encounter an error due to `ERROR: duplicate key value violates unique constraint`. Running the command again will solve the problem.
POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml
Expand Down
Loading

0 comments on commit d6b4b1d

Please sign in to comment.