Skip to content

Commit

Permalink
change name to 'writeFilter'
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Jan 19, 2024
1 parent 3f5f707 commit 0fc75c4
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, er
return otelutils.WrapK8sCache(k8sCache), nil
}

func BuildNewClientFunc(writeThroughFilterSize int, scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) {
func BuildNewClientFunc(writeFilterSize int, scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) {
return func(config *rest.Config, options client.Options) (client.Client, error) {
var cacheReader client.Reader
cachelessOptions := options
Expand All @@ -47,23 +47,23 @@ func BuildNewClientFunc(writeThroughFilterSize int, scope promutils.Scope) func(
return k8sClient, err
}

filter, err := fastcheck.NewOppoBloomFilter(writeThroughFilterSize, scope.NewSubScope("kube_filter"))
filter, err := fastcheck.NewOppoBloomFilter(writeFilterSize, scope.NewSubScope("kube_filter"))
if err != nil {
return nil, err
}

return flyteK8sClient{
Client: k8sClient,
cacheReader: cacheReader,
writeThroughFilter: filter,
writeFilter: filter,
}, nil
}
}

type flyteK8sClient struct {
client.Client
cacheReader client.Reader
writeThroughFilter fastcheck.Filter
writeFilter fastcheck.Filter
}

func (f flyteK8sClient) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
Expand Down Expand Up @@ -91,14 +91,14 @@ func (f flyteK8sClient) List(ctx context.Context, list client.ObjectList, opts .
func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := idFromObject(obj, "c")
if f.writeThroughFilter.Contains(ctx, id) {
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := f.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
f.writeThroughFilter.Add(ctx, id)
f.writeFilter.Add(ctx, id)
return nil
}

Expand All @@ -107,14 +107,14 @@ func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...c
func (f flyteK8sClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := idFromObject(obj, "d")
if f.writeThroughFilter.Contains(ctx, id) {
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := f.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
f.writeThroughFilter.Add(ctx, id)
f.writeFilter.Add(ctx, id)
return nil
}

Expand Down

0 comments on commit 0fc75c4

Please sign in to comment.