Skip to content

Commit

Permalink
fix(propeller): Replace SharedIndexInformer with Informer
Browse files Browse the repository at this point in the history
Resolves: flyteorg#5087
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Mar 29, 2024
1 parent c61d976 commit 80037f5
Show file tree
Hide file tree
Showing 5 changed files with 614 additions and 37 deletions.
43 changes: 24 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package k8s

import (
"context"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
corev1 "k8s.io/api/core/v1"
"runtime/pprof"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/cache"
)

const resourceLevelMonitorCycleDuration = 10 * time.Second
Expand All @@ -34,7 +34,10 @@ type ResourceLevelMonitor struct {
Levels *labeled.Gauge

// This informer will be used to get a list of the underlying objects that we want a tally of
sharedInformer cache.SharedIndexInformer
informer cache.Informer

// This kubeClient will be used to get a list of the underlying objects that we want a tally of
kubeClient core.KubeClient

// The kind here will be used to differentiate all the metrics, we'll leave out group and version for now
gvk schema.GroupVersionKind
Expand All @@ -49,18 +52,13 @@ type ResourceLevelMonitor struct {
// K8s resource created by a plugin will have (as yet, Flyte doesn't have any plugins that create cluster level resources and
// it probably won't for a long time). We can't assume that all the operators and CRDs that Flyte will ever work with will have
// the exact same set of labels or annotations or owner references. The only thing we can really count on is namespace.
func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interface{}) map[string]int {
func (r *ResourceLevelMonitor) countList(ctx context.Context, pods *corev1.PodList) map[string]int {
// Map of namespace to counts
counts := map[string]int{}

// Collect the object counts by namespace
for _, v := range objects {
metadata, err := meta.Accessor(v)
if err != nil {
logger.Errorf(ctx, "Error converting obj %v to an Accessor %s\n", v, err)
continue
}
counts[metadata.GetNamespace()]++
for _, pod := range pods.Items {
counts[pod.Namespace]++
}

return counts
Expand All @@ -70,8 +68,14 @@ func (r *ResourceLevelMonitor) countList(ctx context.Context, objects []interfac
func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at the namespace layer - since these are just K8s resources, we cannot be guaranteed to have the necessary
// information to derive project/domain
objects := r.sharedInformer.GetStore().List()
counts := r.countList(ctx, objects)

pods := &corev1.PodList{}
if err := r.kubeClient.GetClient().List(ctx, pods); err != nil {
logger.Errorf(ctx, "Error listing objects %s\n", err)
return
}

counts := r.countList(ctx, pods)

for ns, count := range counts {
withNamespaceCtx := contextutils.WithNamespace(ctx, ns)
Expand All @@ -89,7 +93,7 @@ func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
go func() {
defer ticker.Stop()
pprof.SetGoroutineLabels(collectorCtx)
r.sharedInformer.HasSynced()
r.informer.HasSynced()
logger.Infof(ctx, "K8s resource collector %s has synced", r.gvk.Kind)
for {
select {
Expand Down Expand Up @@ -125,8 +129,8 @@ type ResourceMonitorIndex struct {
stopwatches map[promutils.Scope]*labeled.StopWatch
}

func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, si cache.SharedIndexInformer,
gvk schema.GroupVersionKind) *ResourceLevelMonitor {
func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Context, scope promutils.Scope, informer cache.Informer,
gvk schema.GroupVersionKind, kubeClient core.KubeClient) *ResourceLevelMonitor {

logger.Infof(ctx, "Attempting to create K8s gauge emitter for kind %s/%s", gvk.Version, gvk.Kind)

Expand Down Expand Up @@ -157,8 +161,9 @@ func (r *ResourceMonitorIndex) GetOrCreateResourceLevelMonitor(ctx context.Conte
Scope: scope,
CollectorTimer: r.stopwatches[scope],
Levels: r.gauges[scope],
sharedInformer: si,
informer: informer,
gvk: gvk,
kubeClient: kubeClient,
}
r.monitors[gvk] = rm

Expand Down
16 changes: 5 additions & 11 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -641,11 +641,11 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

// Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on
pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch)
pluginInformer, err := getPluginInformer(ctx, kubeClient, entry.ResourceToWatch)
if err != nil {
return nil, err
}
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk)
rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk, kubeClient)
// Start the poller and gauge emitter
rm.RunCollectorOnce(ctx)

Expand All @@ -668,16 +668,10 @@ func getPluginGvk(resourceToWatch runtime.Object) (schema.GroupVersionKind, erro
return kinds[0], nil
}

func getPluginSharedInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.SharedIndexInformer, error) {
func getPluginInformer(ctx context.Context, kubeClient pluginsCore.KubeClient, resourceToWatch client.Object) (cache.Informer, error) {
i, err := kubeClient.GetCache().GetInformer(ctx, resourceToWatch)
if err != nil {
return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "Error getting informer for %s", reflect.TypeOf(i))
}

si, casted := i.(cache.SharedIndexInformer)
if !casted {
return nil, errors.Errorf(errors.PluginInitializationFailed, "wrong type. Actual: %v", reflect.TypeOf(i))
}

return si, nil
return i, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func TestResourceManagerConstruction(t *testing.T) {
gvk, err := getPluginGvk(&v1.Pod{})
assert.NoError(t, err)
assert.Equal(t, gvk.Kind, "Pod")
si, err := getPluginSharedInformer(ctx, fakeKubeClient, &v1.Pod{})
si, err := getPluginInformer(ctx, fakeKubeClient, &v1.Pod{})
assert.NotNil(t, si)
assert.NoError(t, err)
rm := index.GetOrCreateResourceLevelMonitor(ctx, scope, si, gvk)
Expand Down
43 changes: 39 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ go 1.21
require (
github.com/flyteorg/flyte/datacatalog v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flyteadmin v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v1.9.12
github.com/flyteorg/flyte/flytestdlib v1.9.12
github.com/flyteorg/flytectl v0.8.14
github.com/golang/glog v1.2.0
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
Expand All @@ -27,11 +28,16 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 // indirect
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 // indirect
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/NYTimes/gizmo v1.3.6 // indirect
github.com/Shopify/sarama v1.26.4 // indirect
github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5 // indirect
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/awalterschulze/gographviz v2.0.3+incompatible // indirect
github.com/aws/aws-sdk-go v1.44.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.0.0 // indirect
Expand All @@ -50,21 +56,31 @@ require (
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.8.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.14.0 // indirect
github.com/containerd/containerd v1.5.10 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/coreos/go-oidc/v3 v3.6.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0 // indirect
github.com/dgraph-io/ristretto v0.0.3 // indirect
github.com/disiqueira/gotree v1.0.0 // indirect
github.com/docker/distribution v2.8.0+incompatible // indirect
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/enescakir/emoji v1.0.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/flyte/flyteidl v1.9.12 // indirect
github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand All @@ -76,9 +92,11 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.4.8 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
Expand All @@ -88,6 +106,8 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-github/v42 v42.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -101,8 +121,10 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand All @@ -118,11 +140,14 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/kubeflow/common v0.4.3 // indirect
github.com/kubeflow/training-operator v1.5.0-rc.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/landoop/tableprinter v0.0.0-20180806200924-8bd8c2576d27 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.7 // indirect
github.com/lestrrat-go/httpcc v1.0.0 // indirect
github.com/lestrrat-go/iter v1.0.1 // indirect
Expand All @@ -132,15 +157,21 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mattn/goveralls v0.0.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mouuff/go-rocket-update v1.5.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/ory/fosite v0.42.2 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
Expand All @@ -158,7 +189,9 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/ray-project/kuberay/ray-operator v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sendgrid/rest v2.6.8+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.10.0+incompatible // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -174,6 +207,8 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/wI2L/jsondiff v0.5.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
github.com/zalando/go-keyring v0.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
Expand Down
Loading

0 comments on commit 80037f5

Please sign in to comment.