Skip to content

Commit

Permalink
removed kube exectors
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Jan 12, 2024
1 parent 32cde08 commit 91a6390
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 314 deletions.
9 changes: 7 additions & 2 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
"github.com/flyteorg/flyte/flytepropeller/pkg/webhook"
webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/profutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -111,7 +111,12 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
DefaultNamespaces: namespaceConfigs,
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options)
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
Metrics: metricsserver.Options{
// Disable metrics serving
Expand Down
81 changes: 0 additions & 81 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package executors

import (
"context"
"fmt"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/fastcheck"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

//go:generate mockery -name Client -case=underscore
Expand All @@ -22,77 +15,3 @@ type Client interface {
// GetCache returns a cache.Cache
GetCache() cache.Cache
}

// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
}

type FallbackClientBuilder struct {
scope promutils.Scope
}

func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return client.New(config, options)
}

// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder {
return &FallbackClientBuilder{
scope: scope,
}
}

type writeThroughCachingWriter struct {
client.Client
filter fastcheck.Filter
}

func IDFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}

// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := IDFromObject(obj, "c")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := IDFromObject(obj, "d")
if w.filter.Contains(ctx, id) {
return nil
}
err := w.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
return nil
}

func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) {
filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter"))
if err != nil {
return writeThroughCachingWriter{}, err
}
return writeThroughCachingWriter{
Client: c,
filter: filter,
}, nil
}
131 changes: 0 additions & 131 deletions flytepropeller/pkg/controller/executors/kube_test.go

This file was deleted.

100 changes: 0 additions & 100 deletions flytepropeller/pkg/controller/executors/mocks/client_builder.go

This file was deleted.

0 comments on commit 91a6390

Please sign in to comment.