From 91a6390163a60d26dcdc7f5b7ff1002cd498120b Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 12 Jan 2024 10:13:53 -0600 Subject: [PATCH] removed kube exectors Signed-off-by: Daniel Rammer --- flytepropeller/cmd/controller/cmd/webhook.go | 9 +- .../pkg/controller/executors/kube.go | 81 ----------- .../pkg/controller/executors/kube_test.go | 131 ------------------ .../executors/mocks/client_builder.go | 100 ------------- 4 files changed, 7 insertions(+), 314 deletions(-) delete mode 100644 flytepropeller/pkg/controller/executors/kube_test.go delete mode 100644 flytepropeller/pkg/controller/executors/mocks/client_builder.go diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index f34f21d12c..c7d1576d55 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -14,12 +14,12 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flyte/flytepropeller/pkg/signals" "github.com/flyteorg/flyte/flytepropeller/pkg/webhook" webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config" "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/profutils" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" @@ -111,7 +111,12 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w DefaultNamespaces: namespaceConfigs, }, NewClient: func(config *rest.Config, options client.Options) (client.Client, error) { - return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options) + k8sClient, err := client.New(config, options) + if err != nil { + return k8sClient, err + } + + return otelutils.WrapK8sClient(k8sClient), nil }, Metrics: metricsserver.Options{ // Disable metrics serving diff --git a/flytepropeller/pkg/controller/executors/kube.go b/flytepropeller/pkg/controller/executors/kube.go index acd4f5c4f3..3cb546e03d 100644 --- a/flytepropeller/pkg/controller/executors/kube.go +++ b/flytepropeller/pkg/controller/executors/kube.go @@ -1,15 +1,8 @@ package executors import ( - "context" - "fmt" - - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/flyteorg/flyte/flytestdlib/fastcheck" - "github.com/flyteorg/flyte/flytestdlib/promutils" ) //go:generate mockery -name Client -case=underscore @@ -22,77 +15,3 @@ type Client interface { // GetCache returns a cache.Cache GetCache() cache.Cache } - -// ClientBuilder builder is the interface for the client builder. -type ClientBuilder interface { - // Build returns a new client. - Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) -} - -type FallbackClientBuilder struct { - scope promutils.Scope -} - -func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - return client.New(config, options) -} - -// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API -// calls if it failed. Write calls will always go to raw client directly. -func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder { - return &FallbackClientBuilder{ - scope: scope, - } -} - -type writeThroughCachingWriter struct { - client.Client - filter fastcheck.Filter -} - -func IDFromObject(obj client.Object, op string) []byte { - return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op)) -} - -// Create first checks the local cache if the object with id was previously successfully saved, if not then -// saves the object obj in the Kubernetes cluster -func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - // "c" represents create - id := IDFromObject(obj, "c") - if w.filter.Contains(ctx, id) { - return nil - } - err := w.Client.Create(ctx, obj, opts...) - if err != nil { - return err - } - w.filter.Add(ctx, id) - return nil -} - -// Delete first checks the local cache if the object with id was previously successfully deleted, if not then -// deletes the given obj from Kubernetes cluster. -func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - // "d" represents delete - id := IDFromObject(obj, "d") - if w.filter.Contains(ctx, id) { - return nil - } - err := w.Client.Delete(ctx, obj, opts...) - if err != nil { - return err - } - w.filter.Add(ctx, id) - return nil -} - -func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) { - filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter")) - if err != nil { - return writeThroughCachingWriter{}, err - } - return writeThroughCachingWriter{ - Client: c, - filter: filter, - }, nil -} diff --git a/flytepropeller/pkg/controller/executors/kube_test.go b/flytepropeller/pkg/controller/executors/kube_test.go deleted file mode 100644 index bcaa64ff6f..0000000000 --- a/flytepropeller/pkg/controller/executors/kube_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package executors - -import ( - "context" - "fmt" - "reflect" - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/flyteorg/flyte/flytestdlib/contextutils" - "github.com/flyteorg/flyte/flytestdlib/promutils" - "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" -) - -func TestIdFromObject(t *testing.T) { - type args struct { - ns string - name string - kind string - op string - } - tests := []struct { - name string - args args - want string - }{ - {"default", args{"default", "name", "pod", "c"}, "/v1, Kind=pod:default:name:c"}, - {"no-cluster", args{"my-ns", "name", "pod", "c"}, "/v1, Kind=pod:my-ns:name:c"}, - {"differ-oper", args{"default", "name", "pod", "d"}, "/v1, Kind=pod:default:name:d"}, - {"withcluster", args{"default", "name", "pod", "d"}, "/v1, Kind=pod:default:name:d"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: tt.args.ns, - Name: tt.args.name, - }, - TypeMeta: metav1.TypeMeta{ - Kind: tt.args.kind, - APIVersion: "v1", - }, - } - if got := IDFromObject(p, tt.args.op); !reflect.DeepEqual(got, []byte(tt.want)) { - t.Errorf("IDFromObject() = %s, want %s", string(got), tt.want) - } - }) - } -} - -type singleInvokeClient struct { - client.Client - createCalled bool - deleteCalled bool -} - -func (f *singleInvokeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - if f.createCalled { - return fmt.Errorf("create called more than once") - } - f.createCalled = true - return nil -} - -func (f *singleInvokeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - if f.deleteCalled { - return fmt.Errorf("delete called more than once") - } - f.deleteCalled = true - return nil -} - -func TestWriteThroughCachingWriter_Create(t *testing.T) { - ctx := context.TODO() - c := &singleInvokeClient{} - w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope()) - assert.NoError(t, err) - - p := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "name", - }, - TypeMeta: metav1.TypeMeta{ - Kind: "pod", - APIVersion: "v1", - }, - } - - err = w.Create(ctx, p) - assert.NoError(t, err) - - assert.True(t, c.createCalled) - - err = w.Create(ctx, p) - assert.NoError(t, err) -} - -func TestWriteThroughCachingWriter_Delete(t *testing.T) { - ctx := context.TODO() - c := &singleInvokeClient{} - w, err := newWriteThroughCachingWriter(c, 1000, promutils.NewTestScope()) - assert.NoError(t, err) - - p := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns", - Name: "name", - }, - TypeMeta: metav1.TypeMeta{ - Kind: "pod", - APIVersion: "v1", - }, - } - - err = w.Delete(ctx, p) - assert.NoError(t, err) - - assert.True(t, c.deleteCalled) - - err = w.Delete(ctx, p) - assert.NoError(t, err) -} - -func init() { - labeled.SetMetricKeys(contextutils.ExecIDKey) -} diff --git a/flytepropeller/pkg/controller/executors/mocks/client_builder.go b/flytepropeller/pkg/controller/executors/mocks/client_builder.go deleted file mode 100644 index 3180f480fd..0000000000 --- a/flytepropeller/pkg/controller/executors/mocks/client_builder.go +++ /dev/null @@ -1,100 +0,0 @@ -// Code generated by mockery v1.0.1. DO NOT EDIT. - -package mocks - -import ( - cache "sigs.k8s.io/controller-runtime/pkg/cache" - client "sigs.k8s.io/controller-runtime/pkg/client" - - executors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" - - mock "github.com/stretchr/testify/mock" - - rest "k8s.io/client-go/rest" -) - -// ClientBuilder is an autogenerated mock type for the ClientBuilder type -type ClientBuilder struct { - mock.Mock -} - -type ClientBuilder_Build struct { - *mock.Call -} - -func (_m ClientBuilder_Build) Return(_a0 client.Client, _a1 error) *ClientBuilder_Build { - return &ClientBuilder_Build{Call: _m.Call.Return(_a0, _a1)} -} - -func (_m *ClientBuilder) OnBuild(_a0 cache.Cache, config *rest.Config, options client.Options) *ClientBuilder_Build { - c_call := _m.On("Build", _a0, config, options) - return &ClientBuilder_Build{Call: c_call} -} - -func (_m *ClientBuilder) OnBuildMatch(matchers ...interface{}) *ClientBuilder_Build { - c_call := _m.On("Build", matchers...) - return &ClientBuilder_Build{Call: c_call} -} - -// Build provides a mock function with given fields: _a0, config, options -func (_m *ClientBuilder) Build(_a0 cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - ret := _m.Called(_a0, config, options) - - var r0 client.Client - if rf, ok := ret.Get(0).(func(cache.Cache, *rest.Config, client.Options) client.Client); ok { - r0 = rf(_a0, config, options) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(client.Client) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(cache.Cache, *rest.Config, client.Options) error); ok { - r1 = rf(_a0, config, options) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -type ClientBuilder_WithUncached struct { - *mock.Call -} - -func (_m ClientBuilder_WithUncached) Return(_a0 executors.ClientBuilder) *ClientBuilder_WithUncached { - return &ClientBuilder_WithUncached{Call: _m.Call.Return(_a0)} -} - -func (_m *ClientBuilder) OnWithUncached(objs ...client.Object) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", objs) - return &ClientBuilder_WithUncached{Call: c_call} -} - -func (_m *ClientBuilder) OnWithUncachedMatch(matchers ...interface{}) *ClientBuilder_WithUncached { - c_call := _m.On("WithUncached", matchers...) - return &ClientBuilder_WithUncached{Call: c_call} -} - -// WithUncached provides a mock function with given fields: objs -func (_m *ClientBuilder) WithUncached(objs ...client.Object) executors.ClientBuilder { - _va := make([]interface{}, len(objs)) - for _i := range objs { - _va[_i] = objs[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 executors.ClientBuilder - if rf, ok := ret.Get(0).(func(...client.Object) executors.ClientBuilder); ok { - r0 = rf(objs...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(executors.ClientBuilder) - } - } - - return r0 -}