Skip to content

Commit

Permalink
Fix support for limit-namespace in FlytePropeller (#5238)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 19, 2024
1 parent 2ca3111 commit e8588f3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 91 deletions.
58 changes: 26 additions & 32 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -33,8 +33,8 @@ type ResourceLevelMonitor struct {
// to monitor current levels.
Levels *labeled.Gauge

// This informer will be used to get a list of the underlying objects that we want a tally of
sharedInformer cache.SharedIndexInformer
// cache is used to retrieve the object lists
cache cache.Cache

// The kind here will be used to differentiate all the metrics, we'll leave out group and version for now
gvk schema.GroupVersionKind
Expand All @@ -45,36 +45,30 @@ type ResourceLevelMonitor struct {
once sync.Once
}

// The reason that we use namespace as the one and only thing to cut by is because it's the feature that we are sure that any
// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and
// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have
// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace.
func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int {
// Map of namespace to counts
counts := map[string]int{}

// Collect the object counts by namespace
for _, v := range objects {
metadata, err := meta.Accessor(v)
if err != nil {
logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err)
continue
}
counts[metadata.GetNamespace()]++
}

return counts
}

// The context here is expected to already have a value for the KindKey
func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary
// information to derive project/domain
objects := r.sharedInformer.GetStore().List()
counts := r.countList(ctx, objects)
list := metav1.PartialObjectMetadataList{
TypeMeta: metav1.TypeMeta{
Kind: r.gvk.Kind,
APIVersion: r.gvk.Version,
},
}
if err := r.cache.List(ctx, &list); err != nil {
logger.Warnf(ctx, "failed to list objects for %s.%s: %v", r.gvk.Kind, r.gvk.Version, err)
return
}

// aggregate the object counts by namespace
namespaceCounts := map[string]int{}
for _, item := range list.Items {
namespaceCounts[item.GetNamespace()]++
}

for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
// emit namespace object count metrics
for namespace, count := range namespaceCounts {
withNamespaceCtx := contextutils.WithNamespace(ctx, namespace)
r.Levels.Set(withNamespaceCtx, float64(count))
}
}
Expand All @@ -89,7 +83,7 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
go func() {
defer ticker.Stop()
pprof.SetGoroutineLabels(collectorCtx)
r.sharedInformer.HasSynced()
r.cache.WaitForCacheSync(collectorCtx)
logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind)
for {
select {
Expand Down Expand Up @@ -125,7 +119,7 @@ type ResourceMonitorIndex struct {
stopwatches map[promutils.Scope]*labeled.StopWatch
}

func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer,
func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, cache cache.Cache,
gvk schema.GroupVersionKind) *ResourceLevelMonitor {

logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind)
Expand Down Expand Up @@ -157,7 +151,7 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte
Scope: scope,
CollectorTimer: r.stopwatches[scope],
Levels: r.gauges[scope],
sharedInformer: si,
cache: cache,
gvk: gvk,
}
r.monitors[gvk] = rm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/promutils"
)
Expand All @@ -37,34 +38,29 @@ var pods = []interface{}{
},
}

func TestNewResourceLevelMonitor(t *testing.T) {
x := v1.Pod{}
x.GetObjectMeta()
lm := ResourceLevelMonitor{}
res := lm.countList(context.Background(), pods)
assert.Equal(t, 2, res["ns-a"])
assert.Equal(t, 1, res["ns-b"])
type MyFakeCache struct {
cache.Cache
}

type MyFakeInformer struct {
cache.SharedIndexInformer
store cache.Store
}

func (m MyFakeInformer) GetStore() cache.Store {
return m.store
}
func (m MyFakeCache) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error {
objectMetadataList, ok := list.(*metav1.PartialObjectMetadataList)
if !ok {
return fmt.Errorf("unexpected type %T", list)
}

func (m MyFakeInformer) HasSynced() bool {
return true
}
objectMetadataList.Items = make([]metav1.PartialObjectMetadata, 0)
for _, pod := range pods {
objectMetadataList.Items = append(objectMetadataList.Items, metav1.PartialObjectMetadata{
TypeMeta: objectMetadataList.TypeMeta,
ObjectMeta: pod.(*v1.Pod).ObjectMeta,
})
}

type MyFakeStore struct {
cache.Store
return nil
}

func (m MyFakeStore) List() []interface{} {
return pods
func (m MyFakeCache) WaitForCacheSync(_ context.Context) bool {
return true
}

func TestResourceLevelMonitor_collect(t *testing.T) {
Expand All @@ -73,12 +69,10 @@ func TestResourceLevelMonitor_collect(t *testing.T) {

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}
myCache := MyFakeCache{}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])
rm.collect(ctx)

var expected = `
Expand All @@ -98,14 +92,11 @@ func TestResourceLevelMonitorSingletonness(t *testing.T) {

kinds, _, err := scheme.Scheme.ObjectKinds(&v1.Pod{})
assert.NoError(t, err)
myInformer := MyFakeInformer{
store: MyFakeStore{},
}
myCache := MyFakeCache{}

index := NewResourceMonitorIndex()
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
fmt.Println(rm)
//rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myInformer, kinds[0])
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])
rm2 := index.GetOrCreateResourceLevelMonitor(ctx, scope, myCache, kinds[0])

//assert.Equal(t, rm, rm2)
assert.Equal(t, rm, rm2)
}
23 changes: 2 additions & 21 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8s
import (
"context"
"fmt"
"reflect"
"time"

"golang.org/x/time/rate"
Expand All @@ -16,7 +15,6 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -641,11 +639,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

// Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on
pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch)
if err != nil {
return nil, err
}
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, kubeClient.GetCache(), gvk)

// Start the poller and gauge emitter
rm.RunCollectorOnce(ctx)

Expand All @@ -667,17 +662,3 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro
}
return kinds[0], nil
}

func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) {
i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch)
if err != nil {
return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i))
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i))
}

return si, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,7 @@ func TestResourceManagerConstruction(t *testing.T) {
gvk, err := getPluginGvk(&v1.Pod{})
assert.NoError(t, err)
assert.Equal(t, gvk.Kind, "Pod")
si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{})
assert.NotNil(t, si)
assert.NoError(t, err)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, fakeKubeClient.GetCache(), gvk)
assert.NotNil(t, rm)
}

Expand Down

0 comments on commit e8588f3

Please sign in to comment.