Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for limit-namespace in FlytePropeller #5238

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 26 additions & 32 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -33,8 +33,8 @@ type ResourceLevelMonitor struct {
// to monitor current levels.
Levels *labeled.Gauge

// This informer will be used to get a list of the underlying objects that we want a tally of
sharedInformer cache.SharedIndexInformer
// cache is used to retrieve the object lists
cache cache.Cache

// The kind here will be used to differentiate all the metrics, we'll leave out group and version for now
gvk schema.GroupVersionKind
Expand All @@ -45,36 +45,30 @@ type ResourceLevelMonitor struct {
once sync.Once
}

// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any
// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and
// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have
// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace.
func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int {
// Map of namespace to counts
counts := map[string]int{}

// Collect the object counts by namespace
for _, v := range objects {
metadata, err := meta.Accessor(v)
if err != nil {
logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err)
continue
}
counts[metadata.GetNamespace()]++
}

return counts
}

// The context here is expected to already have a value for the KindKey
func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary
// information to derive project/domain
objects := r.sharedInformer.GetStore().List()
counts := r.countList(ctx, objects)
list := metav1.PartialObjectMetadataList{
TypeMeta: metav1.TypeMeta{
Kind: r.gvk.Kind,
APIVersion: r.gvk.Version,
},
}
if err := r.cache.List(ctx, &list); err != nil {
logger.Warnf(ctx, "failed to list objects for %s.%s: %v", r.gvk.Kind, r.gvk.Version, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hamersaw I'm seeing this log line a lot since we upgraded to 1.12 (45k times in 5 days). For instance:

failed to list objects for PyTorchJob.v1: no matches for kind "PyTorchJob" in version "v1"

The CRD does exist in the cluster:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  ...
  name: pytorchjobs.kubeflow.org
spec:
  group: kubeflow.org
  names:
    kind: PyTorchJob
    listKind: PyTorchJobList
    plural: pytorchjobs
    singular: pytorchjob
  scope: Namespaced
  versions:
  - ...
    name: v1

Tasks using the pytorch plugin also work.

Do you know what the reason could be for the cache not being aware of the pytorchjob crd?

return
}

// aggregate the object counts by namespace
namespaceCounts := map[string]int{}
for _, item := range list.Items {
namespaceCounts[item.GetNamespace()]++
}

for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
// emit namespace object count metrics
for namespace, count := range namespaceCounts {
withNamespaceCtx := contextutils.WithNamespace(ctx, namespace)
r.Levels.Set(withNamespaceCtx, float64(count))
}
}
Expand All @@ -89,7 +83,7 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
go func() {
defer ticker.Stop()
pprof.SetGoroutineLabels(collectorCtx)
r.sharedInformer.HasSynced()
r.cache.WaitForCacheSync(collectorCtx)
logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind)
for {
select {
Expand Down Expand Up @@ -125,7 +119,7 @@ type ResourceMonitorIndex struct {
stopwatches map[promutils.Scope]*labeled.StopWatch
}

func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer,
func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, cache cache.Cache,
gvk schema.GroupVersionKind) *ResourceLevelMonitor {

logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind)
Expand Down Expand Up @@ -157,7 +151,7 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte
Scope: scope,
CollectorTimer: r.stopwatches[scope],
Levels: r.gauges[scope],
sharedInformer: si,
cache: cache,
gvk: gvk,
}
r.monitors[gvk] = rm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/promutils"
)
Expand All @@ -37,34 +38,29 @@ var pods = []interface{}{
},
}

func TestNewResourceLevelMonitor(t *testing.T) {
x := v1.Pod{}
x.GetObjectMeta()
lm := ResourceLevelMonitor{}
res := lm.countList(context.Background(), pods)
assert.Equal(t, 2, res["ns-a"])
assert.Equal(t, 1, res["ns-b"])
type MyFakeCache struct {
cache.Cache
}

type MyFakeInformer struct {
cache.SharedIndexInformer
store cache.Store
}

func (m MyFakeInformer) GetStore() cache.Store {
return m.store
}
func (m MyFakeCache) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
objectMetadataList, ok := list.(*metav1.PartialObjectMetadataList)
if !ok {
return fmt.Errorf("unexpected type %T", list)
}

func (m MyFakeInformer) HasSynced() bool {
return true
}
objectMetadataList.Items = make([]metav1.PartialObjectMetadata, 0)
for _, pod := range pods {
objectMetadataList.Items = append(objectMetadataList.Items, metav1.PartialObjectMetadata{
TypeMeta: objectMetadataList.TypeMeta,
ObjectMeta: pod.(*v1.Pod).ObjectMeta,
})
}

type MyFakeStore struct {
cache.Store
return nil
}

func (m MyFakeStore) List() []interface{} {
return pods
func (m MyFakeCache) WaitForCacheSync(_ context.Context) bool {
return true
}

func TestResourceLevelMonitor_collect(t *testing.T) {
Expand All @@ -73,12 +69,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) {

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}
myCache := MyFakeCache{}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])
rm.collect(ctx)

var expected = `
Expand All @@ -98,14 +92,11 @@ func TestResourceLevelMonitorSingletonness(t *testing.T) {

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}
myCache := MyFakeCache{}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
fmt.Println(rm)
//rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])
rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])

//assert.Equal(t, rm, rm2)
assert.Equal(t, rm, rm2)
}
23 changes: 2 additions & 21 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8s
import (
"context"
"fmt"
"reflect"
"time"

"golang.org/x/time/rate"
Expand All @@ -16,7 +15,6 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -641,11 +639,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

// Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on
pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch)
if err != nil {
return nil, err
}
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, kubeClient.GetCache(), gvk)

// Start the poller and gauge emitter
rm.RunCollectorOnce(ctx)

Expand All @@ -667,17 +662,3 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro
}
return kinds[0], nil
}

func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) {
i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch)
if err != nil {
return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i))
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i))
}

return si, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,7 @@ func TestResourceManagerConstruction(t *testing.T) {
gvk, err := getPluginGvk(&v1.Pod{})
assert.NoError(t, err)
assert.Equal(t, gvk.Kind, "Pod")
si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{})
assert.NotNil(t, si)
assert.NoError(t, err)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, fakeKubeClient.GetCache(), gvk)
assert.NotNil(t, rm)
}

Expand Down
Loading