diff --git a/cmd/single/start.go b/cmd/single/start.go index d415f82111..7be3bacfb9 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "os" + "strings" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -108,13 +109,12 @@ func startAdmin(ctx context.Context, cfg Admin) error { func startPropeller(ctx context.Context, cfg Propeller) error { propellerCfg := propellerConfig.GetConfig() - propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace) - limitNamespace := "" - var namespaceConfigs map[string]cache.Config + propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(propellerCfg.LimitNamespace, ",", "-", -1)) + namespaceConfigs := make(map[string]cache.Config) if propellerCfg.LimitNamespace != defaultNamespace { - limitNamespace = propellerCfg.LimitNamespace - namespaceConfigs = map[string]cache.Config{ - limitNamespace: {}, + limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",") + for _, limitNamespace := range limitNamespaces { + namespaceConfigs[limitNamespace] = cache.Config{} } } diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index e1069650ad..31c69bc20b 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "runtime" + "strings" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" @@ -128,13 +129,12 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { } // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. - propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace) - limitNamespace := "" - var namespaceConfigs map[string]cache.Config + propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(cfg.LimitNamespace, ",", "-", -1)) + namespaceConfigs := make(map[string]cache.Config) if cfg.LimitNamespace != defaultNamespace { - limitNamespace = cfg.LimitNamespace - namespaceConfigs = map[string]cache.Config{ - limitNamespace: {}, + limitNamespaces := strings.Split(cfg.LimitNamespace, ",") + for _, limitNamespace := range limitNamespaces { + namespaceConfigs[limitNamespace] = cache.Config{} } } diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index ae538385fb..fc312f7197 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "strings" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" @@ -96,10 +97,11 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w } webhookScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("webhook") - var namespaceConfigs map[string]cache.Config + namespaceConfigs := make(map[string]cache.Config) if propellerCfg.LimitNamespace != defaultNamespace { - namespaceConfigs = map[string]cache.Config{ - propellerCfg.LimitNamespace: {}, + limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",") + for _, limitNamespace := range limitNamespaces { + namespaceConfigs[limitNamespace] = cache.Config{} } } diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 419386eddd..99d4ef3366 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -132,7 +132,7 @@ type Config struct { Workers int `json:"workers" pflag:",Number of threads to process workflows"` WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"` DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"` - LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"` + LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller (comma separated)"` ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"` MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."` diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index ea0b428c2f..e6c9b3ce4c 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -55,7 +55,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), defaultConfig.Workers, "Number of threads to process workflows") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "workflow-reeval-duration"), defaultConfig.WorkflowReEval.String(), "Frequency of re-evaluating workflows") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "downstream-eval-duration"), defaultConfig.DownstreamEval.String(), "Frequency of re-evaluating downstream tasks") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limit-namespace"), defaultConfig.LimitNamespace, "Namespaces to watch for this propeller") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limit-namespace"), defaultConfig.LimitNamespace, "Namespaces to watch for this propeller (comma separated)") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), defaultConfig.ProfilerPort.String(), "Profiler port") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metadata-prefix"), defaultConfig.MetadataPrefix, "MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "rawoutput-prefix"), defaultConfig.DefaultRawOutputPrefix, "a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored.") diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index afa0c6e9ef..c4b978fc43 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "runtime/pprof" + "strings" "time" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -528,7 +529,10 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform } if cfg.LimitNamespace != defaultNamespace { - opts = append(opts, informers.WithNamespace(cfg.LimitNamespace)) + limitNamespaces := strings.Split(cfg.LimitNamespace, ",") + for _, limitNamespace := range limitNamespaces { + opts = append(opts, informers.WithNamespace(limitNamespace)) + } } return opts } diff --git a/flytepropeller/pkg/controller/garbage_collector.go b/flytepropeller/pkg/controller/garbage_collector.go index e02e1beca2..3ffaccecb9 100644 --- a/flytepropeller/pkg/controller/garbage_collector.go +++ b/flytepropeller/pkg/controller/garbage_collector.go @@ -44,19 +44,30 @@ func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error { s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...) } - // Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each. - if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" { + // Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each. + if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") { namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{}) if err != nil { return err } - for _, n := range namespaceList.Items { - namespaceCtx := contextutils.WithNamespace(ctx, n.GetName()) - logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName()) - if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil { + var namespaces []string + if strings.Contains(g.namespace, ",") { + namespaces = strings.Split(g.namespace, ",") + } else { + namespaces = make([]string, 0) + for _, n := range namespaceList.Items { + namespaces = append(namespaces, n.GetName()) + } + } + + for _, namespace := range namespaces { + namespaceCtx := contextutils.WithNamespace(ctx, namespace) + logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace) + + if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil { g.metrics.gcRoundFailure.Inc(namespaceCtx) - logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err) + logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err) } else { g.metrics.gcRoundSuccess.Inc(namespaceCtx) } @@ -81,19 +92,30 @@ func (g *GarbageCollector) deprecatedDeleteWorkflows(ctx context.Context) error s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...) } - // Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each. - if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" { + // Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each. + if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") { namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{}) if err != nil { return err } - for _, n := range namespaceList.Items { - namespaceCtx := contextutils.WithNamespace(ctx, n.GetName()) - logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName()) - if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil { + var namespaces []string + if strings.Contains(g.namespace, ",") { + namespaces = strings.Split(g.namespace, ",") + } else { + namespaces = make([]string, 0) + for _, n := range namespaceList.Items { + namespaces = append(namespaces, n.GetName()) + } + } + + for _, namespace := range namespaces { + namespaceCtx := contextutils.WithNamespace(ctx, namespace) + logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace) + + if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil { g.metrics.gcRoundFailure.Inc(namespaceCtx) - logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err) + logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err) } else { g.metrics.gcRoundSuccess.Inc(namespaceCtx) }