Skip to content

Commit

Permalink
feat(propeller): Support multiple namespaces for limit-namespace
Browse files Browse the repository at this point in the history
Resolves: #5181
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed May 9, 2024
1 parent b55026e commit fcb0598
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 32 deletions.
12 changes: 6 additions & 6 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"strings"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -108,13 +109,12 @@ func startAdmin(ctx context.Context, cfg Admin) error {

func startPropeller(ctx context.Context, cfg Propeller) error {
propellerCfg := propellerConfig.GetConfig()
propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace)
limitNamespace := ""
var namespaceConfigs map[string]cache.Config
propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(propellerCfg.LimitNamespace, ",", "-", -1))
namespaceConfigs := make(map[string]cache.Config)
if propellerCfg.LimitNamespace != defaultNamespace {
limitNamespace = propellerCfg.LimitNamespace
namespaceConfigs = map[string]cache.Config{
limitNamespace: {},
limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
12 changes: 6 additions & 6 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"runtime"
"strings"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -128,13 +129,12 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
}

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)
limitNamespace := ""
var namespaceConfigs map[string]cache.Config
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(cfg.LimitNamespace, ",", "-", -1))
namespaceConfigs := make(map[string]cache.Config)
if cfg.LimitNamespace != defaultNamespace {
limitNamespace = cfg.LimitNamespace
namespaceConfigs = map[string]cache.Config{
limitNamespace: {},
limitNamespaces := strings.Split(cfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
8 changes: 5 additions & 3 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"strings"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -96,10 +97,11 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
}

webhookScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("webhook")
var namespaceConfigs map[string]cache.Config
namespaceConfigs := make(map[string]cache.Config)
if propellerCfg.LimitNamespace != defaultNamespace {
namespaceConfigs = map[string]cache.Config{
propellerCfg.LimitNamespace: {},
limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type Config struct {
Workers int `json:"workers" pflag:",Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"`
LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller (comma separated)"`
ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."`
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"runtime/pprof"
"strings"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -528,7 +529,10 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform
}

if cfg.LimitNamespace != defaultNamespace {
opts = append(opts, informers.WithNamespace(cfg.LimitNamespace))
limitNamespaces := strings.Split(cfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
opts = append(opts, informers.WithNamespace(limitNamespace))

Check warning on line 534 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L532-L534

Added lines #L532 - L534 were not covered by tests
}
}
return opts
}
Expand Down
50 changes: 36 additions & 14 deletions flytepropeller/pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,30 @@ func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error {
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
// Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())

if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
var namespaces []string
if strings.Contains(g.namespace, ",") {
namespaces = strings.Split(g.namespace, ",")

Check warning on line 56 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L56

Added line #L56 was not covered by tests
} else {
namespaces = make([]string, 0)
for _, n := range namespaceList.Items {
namespaces = append(namespaces, n.GetName())
}
}

for _, namespace := range namespaces {
namespaceCtx := contextutils.WithNamespace(ctx, namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace)

if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err)

Check warning on line 70 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L70

Added line #L70 was not covered by tests
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
Expand All @@ -81,19 +92,30 @@ func (g *GarbageCollector) deprecatedDeleteWorkflows(ctx context.Context) error
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
// Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())

if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
var namespaces []string
if strings.Contains(g.namespace, ",") {
namespaces = strings.Split(g.namespace, ",")

Check warning on line 104 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L104

Added line #L104 was not covered by tests
} else {
namespaces = make([]string, 0)
for _, n := range namespaceList.Items {
namespaces = append(namespaces, n.GetName())
}
}

for _, namespace := range namespaces {
namespaceCtx := contextutils.WithNamespace(ctx, namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace)

if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err)

Check warning on line 118 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L118

Added line #L118 was not covered by tests
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
Expand Down

0 comments on commit fcb0598

Please sign in to comment.