From 6858c58c3096c6c32953231c2546ccf14f4eeb24 Mon Sep 17 00:00:00 2001 From: Maryam Tahhan Date: Thu, 15 Aug 2024 13:48:53 +0100 Subject: [PATCH] fix: watcher resubmit items to workqueue (#1686) Fixes https://github.com/sustainable-computing-io/kepler/pull/1659 Signed-off-by: Maryam Tahhan --- cmd/exporter/exporter.go | 4 + go.mod | 2 +- hack/verify.sh | 4 +- manifests/k8s/config/exporter/exporter.yaml | 8 +- pkg/kubernetes/watcher.go | 109 +++++++++++++------- pkg/manager/manager.go | 14 ++- 6 files changed, 99 insertions(+), 42 deletions(-) diff --git a/cmd/exporter/exporter.go b/cmd/exporter/exporter.go index 64f62b9525..f763814547 100644 --- a/cmd/exporter/exporter.go +++ b/cmd/exporter/exporter.go @@ -152,6 +152,10 @@ func main() { defer bpfExporter.Detach() m := manager.New(bpfExporter) + if m == nil { + klog.Fatal("could not create a collector manager") + } + defer m.Stop() // starting a CollectorManager instance to collect data and report metrics if startErr := m.Start(); startErr != nil { diff --git a/go.mod b/go.mod index 3eb68a2c6d..38525b6b81 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/sirupsen/logrus v1.9.3 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/sys v0.22.0 + golang.org/x/time v0.5.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.29.7 k8s.io/apimachinery v0.29.7 @@ -76,7 +77,6 @@ require ( golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect - golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.23.0 // indirect google.golang.org/protobuf v1.34.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/hack/verify.sh b/hack/verify.sh index 2e526a0dae..655188de05 100755 --- a/hack/verify.sh +++ b/hack/verify.sh @@ -64,8 +64,9 @@ log_kepler() { } exec_kepler() { - run kubectl exec -ti -n kepler daemonset/kepler-exporter curl "localhost:9102/metrics" |grep ^kepler_ + run kubectl exec -n "$KEPLER_NS" daemonset/"$EXPORTER" -- curl "localhost:9102/metrics" |grep ^kepler_ } + watch_service() { local port="$1" local ns="$2" @@ -78,7 +79,6 @@ intergration_test() { log_kepler & # dump metrics before running tests exec_kepler & - local ret=0 go test ./e2e/integration-test/... -v --race --bench=. -cover --count=1 --vet=all \ 2>&1 | tee "$ARTIFACT_DIR/e2e.log" || ret=1 diff --git a/manifests/k8s/config/exporter/exporter.yaml b/manifests/k8s/config/exporter/exporter.yaml index 67d35ea71a..f9ae55b323 100644 --- a/manifests/k8s/config/exporter/exporter.yaml +++ b/manifests/k8s/config/exporter/exporter.yaml @@ -49,8 +49,12 @@ spec: app.kubernetes.io/name: kepler-exporter spec: tolerations: - - effect: NoSchedule - key: node-role.kubernetes.io/master + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule dnsPolicy: ClusterFirstWithHostNet serviceAccountName: kepler-sa hostPID: true diff --git a/pkg/kubernetes/watcher.go b/pkg/kubernetes/watcher.go index 45906f4853..3d67e7e9b7 100644 --- a/pkg/kubernetes/watcher.go +++ b/pkg/kubernetes/watcher.go @@ -24,6 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -95,7 +97,7 @@ func newK8sClient() *kubernetes.Clientset { return clientset } -func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher { +func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) (*ObjListWatcher, error) { w := &ObjListWatcher{ stopChannel: make(chan struct{}), k8sCli: newK8sClient(), @@ -104,10 +106,10 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } if w.k8sCli == nil || !config.EnableAPIServer { - return w + return w, nil } optionsModifier := func(options *metav1.ListOptions) { - options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", stats.NodeName) // to filter events per node + options.FieldSelector = fields.Set{"spec.nodeName": stats.GetNodeName()}.AsSelector().String() // to filter events per node } objListWatcher := cache.NewFilteredListWatchFromClient( w.k8sCli.CoreV1().RESTClient(), @@ -117,36 +119,43 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher ) w.informer = cache.NewSharedIndexInformer(objListWatcher, &corev1.Pod{}, 0, cache.Indexers{}) w.stopChannel = make(chan struct{}) + _, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { - w.workqueue.Add(key) + w.workqueue.AddRateLimited(key) } + utilruntime.HandleError(err) }, UpdateFunc: func(old interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { - w.workqueue.Add(key) + w.workqueue.AddRateLimited(key) } + utilruntime.HandleError(err) }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { - w.workqueue.Add(key) + w.workqueue.AddRateLimited(key) } + utilruntime.HandleError(err) }, }) + if err != nil { - klog.Fatalf("%v", err) + klog.Errorf("%v", err) + return nil, err } IsWatcherEnabled = true - return w + return w, nil } func (w *ObjListWatcher) processNextItem() bool { key, quit := w.workqueue.Get() if quit { + klog.V(5).Info("quitting processNextItem") return false } defer w.workqueue.Done(key) @@ -159,43 +168,45 @@ func (w *ObjListWatcher) processNextItem() bool { func (w *ObjListWatcher) handleErr(err error, key interface{}) { // No error! if err == nil { + klog.V(5).Infof("Successfully synced '%s'", key) w.workqueue.Forget(key) return } - // Retry + // Put the item back on the workqueue to handle any transient errors + // if it hasn't already been requeued more times than our maxRetries if w.workqueue.NumRequeues(key) < maxRetries { - klog.Errorf("Error syncing pod %v: %v", key, err) + klog.V(5).Infof("failed to sync pod %v: %v ... requeuing, retries %v", key, err, w.workqueue.NumRequeues(key)) w.workqueue.AddRateLimited(key) return } - - // Give up + // Give up if we've exceeded MaxRetries, remove the item from the queue + klog.V(5).Infof("Dropping pod %q out of the queue: %v", key, err) w.workqueue.Forget(key) - klog.Infof("Dropping pod %q out of the queue: %v", key, err) + + // handle any errors that occurred + utilruntime.HandleError(err) } func (w *ObjListWatcher) handleEvent(key string) error { obj, exists, err := w.informer.GetIndexer().GetByKey(key) if err != nil { - klog.Errorf("Fetching object with key %s from store failed with %v", key, err) + klog.Errorf("fetching object with key %s from store failed with %v", key, err) return err } + if !exists { - w.handleDeleted(obj) - } else { - w.handleAdd(obj) + return w.handleDeleted(obj) } - return nil + + return w.handleAdd(obj) } -func (w *ObjListWatcher) Run() { +func (w *ObjListWatcher) Run() error { if !IsWatcherEnabled { - klog.Infoln("k8s APIserver watcher was not enabled") - return + return fmt.Errorf("k8s APIserver watcher was not enabled") } - defer w.workqueue.ShutDown() - + defer utilruntime.HandleCrash() go w.informer.Run(w.stopChannel) timeoutCh := make(chan struct{}) @@ -203,8 +214,10 @@ func (w *ObjListWatcher) Run() { close(timeoutCh) }) defer timeoutTimer.Stop() + + klog.V(5).Info("Waiting for caches to sync") if !cache.WaitForCacheSync(timeoutCh, w.informer.HasSynced) { - klog.Fatalf("watcher timed out waiting for caches to sync") + return fmt.Errorf("watcher timed out waiting for caches to sync") } // launch workers to handle events @@ -213,6 +226,7 @@ func (w *ObjListWatcher) Run() { } klog.Infoln("k8s APIserver watcher was started") + return nil } func (w *ObjListWatcher) runWorker() { @@ -225,16 +239,18 @@ func (w *ObjListWatcher) Stop() { close(w.stopChannel) } -func (w *ObjListWatcher) handleAdd(obj interface{}) { +func (w *ObjListWatcher) handleAdd(obj interface{}) error { + var err error switch w.ResourceKind { case podResourceType: pod, ok := obj.(*corev1.Pod) if !ok { - klog.Infof("Could not convert obj: %v", w.ResourceKind) - return + return fmt.Errorf("could not convert obj: %v", w.ResourceKind) } for _, condition := range pod.Status.Conditions { if condition.Type != corev1.ContainersReady { + // set the error in case we reach the end of the loop and no ContainersReady condition is found + err = fmt.Errorf("containers not ready in pod: %v", pod.Name) continue } klog.V(5).Infof("Pod %s %s is ready with %d container statuses, %d init container status, %d ephemeral statues", @@ -244,12 +260,17 @@ func (w *ObjListWatcher) handleAdd(obj interface{}) { err2 := w.fillInfo(pod, pod.Status.InitContainerStatuses) err3 := w.fillInfo(pod, pod.Status.EphemeralContainerStatuses) w.Mx.Unlock() - klog.V(5).Infof("parsing pod %s %s status: %v %v %v", pod.Name, pod.Namespace, err1, err2, err3) + if err1 != nil || err2 != nil || err3 != nil { + err = fmt.Errorf("parsing pod %s %s ContainerStatuses issue : %v, InitContainerStatuses issue :%v, EphemeralContainerStatuses issue :%v", pod.Name, pod.Namespace, err1, err2, err3) + return err + } + klog.V(5).Infof("parsing pod %s %s status: %v %v %v", pod.Name, pod.Namespace, pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses) + return nil } default: - klog.Infof("Watcher does not support object type %s", w.ResourceKind) - return + err = fmt.Errorf("watcher does not support object type %s", w.ResourceKind) } + return err } func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.ContainerStatus) error { @@ -259,7 +280,8 @@ func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.Container containerID := ParseContainerIDFromPodStatus(containers[j].ContainerID) // verify if container ID was already initialized if containerID == "" { - err = fmt.Errorf("container %s did not start yet", containers[j].Name) + // mark the error to requeue to the workqueue + err = fmt.Errorf("container %s did not start yet status", containers[j].Name) continue } if _, exist = w.ContainerStats[containerID]; !exist { @@ -273,22 +295,23 @@ func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.Container return err } -func (w *ObjListWatcher) handleDeleted(obj interface{}) { +func (w *ObjListWatcher) handleDeleted(obj interface{}) error { switch w.ResourceKind { case podResourceType: pod, ok := obj.(*corev1.Pod) if !ok { - klog.Fatalf("Could not convert obj: %v", w.ResourceKind) + return fmt.Errorf("could not convert obj: %v", w.ResourceKind) } w.Mx.Lock() w.deleteInfo(pod.Status.ContainerStatuses) w.deleteInfo(pod.Status.InitContainerStatuses) w.deleteInfo(pod.Status.EphemeralContainerStatuses) w.Mx.Unlock() + klog.V(5).Infof("deleting pod %s %s", pod.Name, pod.Namespace) default: - klog.Infof("Watcher does not support object type %s", w.ResourceKind) - return + return fmt.Errorf("watcher does not support object type %s", w.ResourceKind) } + return nil } // TODO: instead of delete, it might be better to mark it to delete since k8s takes time to really delete an object @@ -302,3 +325,19 @@ func (w *ObjListWatcher) deleteInfo(containers []corev1.ContainerStatus) { func ParseContainerIDFromPodStatus(containerID string) string { return regexReplaceContainerIDPrefix.ReplaceAllString(containerID, "") } + +func (w *ObjListWatcher) ShutDownWithDrain() { + done := make(chan struct{}) + + // ShutDownWithDrain waits for all in-flight work to complete and thus could block indefinitely so put a deadline on it. + go func() { + w.workqueue.ShutDownWithDrain() + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + klog.Warningf("timed out draining the queue on shut down") + } +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index a60ac9d63a..5094ce453d 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -24,6 +24,7 @@ import ( "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/kubernetes" exporter "github.com/sustainable-computing-io/kepler/pkg/metrics" + "k8s.io/klog/v2" ) var ( @@ -42,6 +43,7 @@ type CollectorManager struct { } func New(bpfExporter bpf.Exporter) *CollectorManager { + var err error manager := &CollectorManager{} supportedMetrics := bpfExporter.SupportedMetrics() manager.StatsCollector = collector.NewCollector(bpfExporter) @@ -52,10 +54,14 @@ func New(bpfExporter bpf.Exporter) *CollectorManager { manager.PrometheusCollector.NewVMCollector(manager.StatsCollector.VMStats) manager.PrometheusCollector.NewNodeCollector(&manager.StatsCollector.NodeStats) // configure the watcher - manager.Watcher = kubernetes.NewObjListWatcher(supportedMetrics) + if manager.Watcher, err = kubernetes.NewObjListWatcher(supportedMetrics); err != nil { + klog.Errorf("could not create the watcher, %v", err) + } manager.Watcher.Mx = &manager.PrometheusCollector.Mx manager.Watcher.ContainerStats = manager.StatsCollector.ContainerStats - manager.Watcher.Run() + if err = manager.Watcher.Run(); err != nil { + klog.Errorf("could not run the watcher %v", err) + } return manager } @@ -79,3 +85,7 @@ func (m *CollectorManager) Start() error { return nil } + +func (m *CollectorManager) Stop() { + m.Watcher.ShutDownWithDrain() +}