Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed orphaned kube exectors from FlytePropeller #4722

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
"github.com/flyteorg/flyte/flytepropeller/pkg/webhook"
webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/profutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -111,7 +111,12 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
DefaultNamespaces: namespaceConfigs,
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options)
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
Metrics: metricsserver.Options{
// Disable metrics serving
Expand Down
81 changes: 0 additions & 81 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package executors

import (
"context"
"fmt"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/fastcheck"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

//go:generate mockery -name Client -case=underscore
Expand All @@ -22,77 +15,3 @@ type Client interface {
// GetCache returns a cache.Cache
GetCache() cache.Cache
}

// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
}

type FallbackClientBuilder struct {
scope promutils.Scope
}

func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return client.New(config, options)
}

// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder {
return &FallbackClientBuilder{
scope: scope,
}
}

type writeThroughCachingWriter struct {
client.Client
filter fastcheck.Filter
}

func IDFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}

// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := IDFromObject(obj, "c")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := IDFromObject(obj, "d")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) {
filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter"))
if err != nil {
return writeThroughCachingWriter{}, err
}
return writeThroughCachingWriter{
Client: c,
filter: filter,
}, nil
}
131 changes: 0 additions & 131 deletions flytepropeller/pkg/controller/executors/kube_test.go

This file was deleted.

100 changes: 0 additions & 100 deletions flytepropeller/pkg/controller/executors/mocks/client_builder.go

This file was deleted.

Loading