Skip to content

Commit

Permalink
fix: watcher resubmit items to workqueue (#1686)
Browse files Browse the repository at this point in the history
Fixes #1659

Signed-off-by: Maryam Tahhan <[email protected]>
  • Loading branch information
maryamtahhan authored Aug 15, 2024
1 parent eb5a72a commit 6858c58
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 42 deletions.
4 changes: 4 additions & 0 deletions cmd/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func main() {
defer bpfExporter.Detach()

m := manager.New(bpfExporter)
if m == nil {
klog.Fatal("could not create a collector manager")
}
defer m.Stop()

// starting a CollectorManager instance to collect data and report metrics
if startErr := m.Start(); startErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/sirupsen/logrus v1.9.3
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/sys v0.22.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.29.7
k8s.io/apimachinery v0.29.7
Expand Down Expand Up @@ -76,7 +77,6 @@ require (
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.23.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions hack/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ log_kepler() {
}

exec_kepler() {
run kubectl exec -ti -n kepler daemonset/kepler-exporter curl "localhost:9102/metrics" |grep ^kepler_
run kubectl exec -n "$KEPLER_NS" daemonset/"$EXPORTER" -- curl "localhost:9102/metrics" |grep ^kepler_
}

watch_service() {
local port="$1"
local ns="$2"
Expand All @@ -78,7 +79,6 @@ intergration_test() {
log_kepler &
# dump metrics before running tests
exec_kepler &

local ret=0
go test ./e2e/integration-test/... -v --race --bench=. -cover --count=1 --vet=all \
2>&1 | tee "$ARTIFACT_DIR/e2e.log" || ret=1
Expand Down
8 changes: 6 additions & 2 deletions manifests/k8s/config/exporter/exporter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ spec:
app.kubernetes.io/name: kepler-exporter
spec:
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/master
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
dnsPolicy: ClusterFirstWithHostNet
serviceAccountName: kepler-sa
hostPID: true
Expand Down
109 changes: 74 additions & 35 deletions pkg/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -95,7 +97,7 @@ func newK8sClient() *kubernetes.Clientset {
return clientset
}

func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher {
func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) (*ObjListWatcher, error) {
w := &ObjListWatcher{
stopChannel: make(chan struct{}),
k8sCli: newK8sClient(),
Expand All @@ -104,10 +106,10 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
if w.k8sCli == nil || !config.EnableAPIServer {
return w
return w, nil
}
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", stats.NodeName) // to filter events per node
options.FieldSelector = fields.Set{"spec.nodeName": stats.GetNodeName()}.AsSelector().String() // to filter events per node
}
objListWatcher := cache.NewFilteredListWatchFromClient(
w.k8sCli.CoreV1().RESTClient(),
Expand All @@ -117,36 +119,43 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher
)
w.informer = cache.NewSharedIndexInformer(objListWatcher, &corev1.Pod{}, 0, cache.Indexers{})
w.stopChannel = make(chan struct{})

_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
w.workqueue.Add(key)
w.workqueue.AddRateLimited(key)
}
utilruntime.HandleError(err)
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
w.workqueue.Add(key)
w.workqueue.AddRateLimited(key)
}
utilruntime.HandleError(err)
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
w.workqueue.Add(key)
w.workqueue.AddRateLimited(key)
}
utilruntime.HandleError(err)
},
})

if err != nil {
klog.Fatalf("%v", err)
klog.Errorf("%v", err)
return nil, err
}
IsWatcherEnabled = true
return w
return w, nil
}

func (w *ObjListWatcher) processNextItem() bool {
key, quit := w.workqueue.Get()
if quit {
klog.V(5).Info("quitting processNextItem")
return false
}
defer w.workqueue.Done(key)
Expand All @@ -159,52 +168,56 @@ func (w *ObjListWatcher) processNextItem() bool {
func (w *ObjListWatcher) handleErr(err error, key interface{}) {
// No error!
if err == nil {
klog.V(5).Infof("Successfully synced '%s'", key)
w.workqueue.Forget(key)
return
}

// Retry
// Put the item back on the workqueue to handle any transient errors
// if it hasn't already been requeued more times than our maxRetries
if w.workqueue.NumRequeues(key) < maxRetries {
klog.Errorf("Error syncing pod %v: %v", key, err)
klog.V(5).Infof("failed to sync pod %v: %v ... requeuing, retries %v", key, err, w.workqueue.NumRequeues(key))
w.workqueue.AddRateLimited(key)
return
}

// Give up
// Give up if we've exceeded MaxRetries, remove the item from the queue
klog.V(5).Infof("Dropping pod %q out of the queue: %v", key, err)
w.workqueue.Forget(key)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)

// handle any errors that occurred
utilruntime.HandleError(err)
}

func (w *ObjListWatcher) handleEvent(key string) error {
obj, exists, err := w.informer.GetIndexer().GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
klog.Errorf("fetching object with key %s from store failed with %v", key, err)
return err
}

if !exists {
w.handleDeleted(obj)
} else {
w.handleAdd(obj)
return w.handleDeleted(obj)
}
return nil

return w.handleAdd(obj)
}

func (w *ObjListWatcher) Run() {
func (w *ObjListWatcher) Run() error {
if !IsWatcherEnabled {
klog.Infoln("k8s APIserver watcher was not enabled")
return
return fmt.Errorf("k8s APIserver watcher was not enabled")
}
defer w.workqueue.ShutDown()

defer utilruntime.HandleCrash()
go w.informer.Run(w.stopChannel)

timeoutCh := make(chan struct{})
timeoutTimer := time.AfterFunc(informerTimeout, func() {
close(timeoutCh)
})
defer timeoutTimer.Stop()

klog.V(5).Info("Waiting for caches to sync")
if !cache.WaitForCacheSync(timeoutCh, w.informer.HasSynced) {
klog.Fatalf("watcher timed out waiting for caches to sync")
return fmt.Errorf("watcher timed out waiting for caches to sync")
}

// launch workers to handle events
Expand All @@ -213,6 +226,7 @@ func (w *ObjListWatcher) Run() {
}

klog.Infoln("k8s APIserver watcher was started")
return nil
}

func (w *ObjListWatcher) runWorker() {
Expand All @@ -225,16 +239,18 @@ func (w *ObjListWatcher) Stop() {
close(w.stopChannel)
}

func (w *ObjListWatcher) handleAdd(obj interface{}) {
func (w *ObjListWatcher) handleAdd(obj interface{}) error {
var err error
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Infof("Could not convert obj: %v", w.ResourceKind)
return
return fmt.Errorf("could not convert obj: %v", w.ResourceKind)
}
for _, condition := range pod.Status.Conditions {
if condition.Type != corev1.ContainersReady {
// set the error in case we reach the end of the loop and no ContainersReady condition is found
err = fmt.Errorf("containers not ready in pod: %v", pod.Name)
continue
}
klog.V(5).Infof("Pod %s %s is ready with %d container statuses, %d init container status, %d ephemeral statues",
Expand All @@ -244,12 +260,17 @@ func (w *ObjListWatcher) handleAdd(obj interface{}) {
err2 := w.fillInfo(pod, pod.Status.InitContainerStatuses)
err3 := w.fillInfo(pod, pod.Status.EphemeralContainerStatuses)
w.Mx.Unlock()
klog.V(5).Infof("parsing pod %s %s status: %v %v %v", pod.Name, pod.Namespace, err1, err2, err3)
if err1 != nil || err2 != nil || err3 != nil {
err = fmt.Errorf("parsing pod %s %s ContainerStatuses issue : %v, InitContainerStatuses issue :%v, EphemeralContainerStatuses issue :%v", pod.Name, pod.Namespace, err1, err2, err3)
return err
}
klog.V(5).Infof("parsing pod %s %s status: %v %v %v", pod.Name, pod.Namespace, pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses)
return nil
}
default:
klog.Infof("Watcher does not support object type %s", w.ResourceKind)
return
err = fmt.Errorf("watcher does not support object type %s", w.ResourceKind)
}
return err
}

func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.ContainerStatus) error {
Expand All @@ -259,7 +280,8 @@ func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.Container
containerID := ParseContainerIDFromPodStatus(containers[j].ContainerID)
// verify if container ID was already initialized
if containerID == "" {
err = fmt.Errorf("container %s did not start yet", containers[j].Name)
// mark the error to requeue to the workqueue
err = fmt.Errorf("container %s did not start yet status", containers[j].Name)
continue
}
if _, exist = w.ContainerStats[containerID]; !exist {
Expand All @@ -273,22 +295,23 @@ func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.Container
return err
}

func (w *ObjListWatcher) handleDeleted(obj interface{}) {
func (w *ObjListWatcher) handleDeleted(obj interface{}) error {
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Fatalf("Could not convert obj: %v", w.ResourceKind)
return fmt.Errorf("could not convert obj: %v", w.ResourceKind)
}
w.Mx.Lock()
w.deleteInfo(pod.Status.ContainerStatuses)
w.deleteInfo(pod.Status.InitContainerStatuses)
w.deleteInfo(pod.Status.EphemeralContainerStatuses)
w.Mx.Unlock()
klog.V(5).Infof("deleting pod %s %s", pod.Name, pod.Namespace)
default:
klog.Infof("Watcher does not support object type %s", w.ResourceKind)
return
return fmt.Errorf("watcher does not support object type %s", w.ResourceKind)
}
return nil
}

// TODO: instead of delete, it might be better to mark it to delete since k8s takes time to really delete an object
Expand All @@ -302,3 +325,19 @@ func (w *ObjListWatcher) deleteInfo(containers []corev1.ContainerStatus) {
func ParseContainerIDFromPodStatus(containerID string) string {
return regexReplaceContainerIDPrefix.ReplaceAllString(containerID, "")
}

func (w *ObjListWatcher) ShutDownWithDrain() {
done := make(chan struct{})

// ShutDownWithDrain waits for all in-flight work to complete and thus could block indefinitely so put a deadline on it.
go func() {
w.workqueue.ShutDownWithDrain()
done <- struct{}{}
}()

select {
case <-done:
case <-time.After(5 * time.Second):
klog.Warningf("timed out draining the queue on shut down")
}
}
14 changes: 12 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/sustainable-computing-io/kepler/pkg/config"
"github.com/sustainable-computing-io/kepler/pkg/kubernetes"
exporter "github.com/sustainable-computing-io/kepler/pkg/metrics"
"k8s.io/klog/v2"
)

var (
Expand All @@ -42,6 +43,7 @@ type CollectorManager struct {
}

func New(bpfExporter bpf.Exporter) *CollectorManager {
var err error
manager := &CollectorManager{}
supportedMetrics := bpfExporter.SupportedMetrics()
manager.StatsCollector = collector.NewCollector(bpfExporter)
Expand All @@ -52,10 +54,14 @@ func New(bpfExporter bpf.Exporter) *CollectorManager {
manager.PrometheusCollector.NewVMCollector(manager.StatsCollector.VMStats)
manager.PrometheusCollector.NewNodeCollector(&manager.StatsCollector.NodeStats)
// configure the watcher
manager.Watcher = kubernetes.NewObjListWatcher(supportedMetrics)
if manager.Watcher, err = kubernetes.NewObjListWatcher(supportedMetrics); err != nil {
klog.Errorf("could not create the watcher, %v", err)
}
manager.Watcher.Mx = &manager.PrometheusCollector.Mx
manager.Watcher.ContainerStats = manager.StatsCollector.ContainerStats
manager.Watcher.Run()
if err = manager.Watcher.Run(); err != nil {
klog.Errorf("could not run the watcher %v", err)
}
return manager
}

Expand All @@ -79,3 +85,7 @@ func (m *CollectorManager) Start() error {

return nil
}

func (m *CollectorManager) Stop() {
m.Watcher.ShutDownWithDrain()
}

0 comments on commit 6858c58

Please sign in to comment.