Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(propeller): Support multiple namespaces for limit-namespace #5342

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"strings"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -108,13 +109,12 @@ func startAdmin(ctx context.Context, cfg Admin) error {

func startPropeller(ctx context.Context, cfg Propeller) error {
propellerCfg := propellerConfig.GetConfig()
propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace)
limitNamespace := ""
var namespaceConfigs map[string]cache.Config
propellerScope := promutils.NewScope(propellerConfig.GetConfig().MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(propellerCfg.LimitNamespace, ",", "-", -1))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The , character is invalid in prometheus. Therefore I replace it with - here.

namespaceConfigs := make(map[string]cache.Config)
if propellerCfg.LimitNamespace != defaultNamespace {
limitNamespace = propellerCfg.LimitNamespace
namespaceConfigs = map[string]cache.Config{
limitNamespace: {},
limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
12 changes: 6 additions & 6 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"runtime"
"strings"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -128,13 +129,12 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
}

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)
limitNamespace := ""
var namespaceConfigs map[string]cache.Config
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(strings.Replace(cfg.LimitNamespace, ",", "-", -1))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

namespaceConfigs := make(map[string]cache.Config)
if cfg.LimitNamespace != defaultNamespace {
limitNamespace = cfg.LimitNamespace
namespaceConfigs = map[string]cache.Config{
limitNamespace: {},
limitNamespaces := strings.Split(cfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
8 changes: 5 additions & 3 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"strings"

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -96,10 +97,11 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
}

webhookScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("webhook")
var namespaceConfigs map[string]cache.Config
namespaceConfigs := make(map[string]cache.Config)
if propellerCfg.LimitNamespace != defaultNamespace {
namespaceConfigs = map[string]cache.Config{
propellerCfg.LimitNamespace: {},
limitNamespaces := strings.Split(propellerCfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
namespaceConfigs[limitNamespace] = cache.Config{}
}
}

Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type Config struct {
Workers int `json:"workers" pflag:",Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"`
LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller (comma separated)"`
ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."`
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"fmt"
"os"
"runtime/pprof"
"strings"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -528,7 +529,10 @@
}

if cfg.LimitNamespace != defaultNamespace {
opts = append(opts, informers.WithNamespace(cfg.LimitNamespace))
limitNamespaces := strings.Split(cfg.LimitNamespace, ",")
for _, limitNamespace := range limitNamespaces {
opts = append(opts, informers.WithNamespace(limitNamespace))

Check warning on line 534 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L532-L534

Added lines #L532 - L534 were not covered by tests
}
}
return opts
}
Expand Down
50 changes: 36 additions & 14 deletions flytepropeller/pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,30 @@
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
// Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())

if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
var namespaces []string
if strings.Contains(g.namespace, ",") {
namespaces = strings.Split(g.namespace, ",")

Check warning on line 56 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L56

Added line #L56 was not covered by tests
} else {
namespaces = make([]string, 0)
for _, n := range namespaceList.Items {
namespaces = append(namespaces, n.GetName())
}
}
Comment on lines +55 to +62
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Treat all and comma-separated namespaces as the same case. Transform them into a slice of string and then loop over it.


for _, namespace := range namespaces {
namespaceCtx := contextutils.WithNamespace(ctx, namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace)

if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err)

Check warning on line 70 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L70

Added line #L70 was not covered by tests
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
Expand All @@ -81,19 +92,30 @@
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
// Delete doesn't support 'all' namespaces and comma-separated namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" || strings.Contains(g.namespace, ",") {
namespaceList, err := g.namespaceClient.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
for _, n := range namespaceList.Items {
namespaceCtx := contextutils.WithNamespace(ctx, n.GetName())
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", n.GetName())

if err := g.deleteWorkflowsForNamespace(ctx, n.GetName(), s); err != nil {
var namespaces []string
if strings.Contains(g.namespace, ",") {
namespaces = strings.Split(g.namespace, ",")

Check warning on line 104 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L104

Added line #L104 was not covered by tests
} else {
namespaces = make([]string, 0)
for _, n := range namespaceList.Items {
namespaces = append(namespaces, n.GetName())
}
}
Comment on lines +103 to +110
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above


for _, namespace := range namespaces {
namespaceCtx := contextutils.WithNamespace(ctx, namespace)
logger.Infof(namespaceCtx, "Triggering Workflow delete for namespace: [%s]", namespace)

if err := g.deleteWorkflowsForNamespace(ctx, namespace, s); err != nil {
g.metrics.gcRoundFailure.Inc(namespaceCtx)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", n.GetName(), err)
logger.Errorf(namespaceCtx, "Garbage collection failed for for namespace: [%s]. Error : [%v]", namespace, err)

Check warning on line 118 in flytepropeller/pkg/controller/garbage_collector.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/garbage_collector.go#L118

Added line #L118 was not covered by tests
} else {
g.metrics.gcRoundSuccess.Inc(namespaceCtx)
}
Expand Down
Loading