Skip to content

Commit

Permalink
wrapping k8s client with write filter and cache reader
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Jan 19, 2024
1 parent a62743d commit 3f5f707
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
104 changes: 44 additions & 60 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,107 +33,91 @@ var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, er
return otelutils.WrapK8sCache(k8sCache), nil
}

var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
var reader *fallbackClientReader
if options.Cache != nil && options.Cache.Reader != nil {
// if caching is enabled we create a fallback reader so we can attempt the client if the cache
// reader does not have the object
reader = &fallbackClientReader{
orderedClients: []client.Reader{options.Cache.Reader},
func BuildNewClientFunc(writeThroughFilterSize int, scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) {
return func(config *rest.Config, options client.Options) (client.Client, error) {
var cacheReader client.Reader
cachelessOptions := options
if options.Cache != nil && options.Cache.Reader != nil {
cacheReader = options.Cache.Reader
cachelessOptions.Cache = nil
}

options.Cache.Reader = reader
}
k8sClient, err := client.New(config, cachelessOptions)
if err != nil {
return k8sClient, err
}

// create the k8s client
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}
filter, err := fastcheck.NewOppoBloomFilter(writeThroughFilterSize, scope.NewSubScope("kube_filter"))
if err != nil {
return nil, err
}

k8sOtelClient := otelutils.WrapK8sClient(k8sClient)
if reader != nil {
// once the k8s client is created we set the fallback reader's client to the k8s client
reader.orderedClients = append(reader.orderedClients, k8sOtelClient)
return flyteK8sClient{
Client: k8sClient,
cacheReader: cacheReader,
writeThroughFilter: filter,
}, nil
}

return k8sOtelClient, nil
}

// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which
// directly reads from the API
type fallbackClientReader struct {
orderedClients []client.Reader
type flyteK8sClient struct {
client.Client
cacheReader client.Reader
writeThroughFilter fastcheck.Filter
}

func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.Get(ctx, key, out, opts...); err == nil {
func (f flyteK8sClient) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.Get(ctx, key, out, opts...); err == nil {
return nil
}
}

return
return f.Client.Get(ctx, key, out, opts...)
}

func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.List(ctx, list, opts...); err == nil {
func (f flyteK8sClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.List(ctx, list, opts...); err == nil {
return nil
}
}

return
}

type writeThroughCachingWriter struct {
client.Client
filter fastcheck.Filter
}

func IDFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
return f.Client.List(ctx, list, opts...)
}

// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := IDFromObject(obj, "c")
if w.filter.Contains(ctx, id) {
id := idFromObject(obj, "c")
if f.writeThroughFilter.Contains(ctx, id) {
return nil
}
err := w.Client.Create(ctx, obj, opts...)
err := f.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
f.writeThroughFilter.Add(ctx, id)
return nil
}

// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
func (f flyteK8sClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := IDFromObject(obj, "d")
if w.filter.Contains(ctx, id) {
id := idFromObject(obj, "d")
if f.writeThroughFilter.Contains(ctx, id) {
return nil
}
err := w.Client.Delete(ctx, obj, opts...)
err := f.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
f.writeThroughFilter.Add(ctx, id)
return nil
}

func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) {
filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter"))
if err != nil {
return writeThroughCachingWriter{}, err
}
return writeThroughCachingWriter{
Client: c,
filter: filter,
}, nil
func idFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}

0 comments on commit 3f5f707

Please sign in to comment.