Skip to content

Commit

Permalink
config: add ignored fields
Browse files Browse the repository at this point in the history
Specify fields to skip sending object update. Will be applied to all objects.
If after removal of these fields from k8s object all remaining fields will be equal,
handler won't trigger sending update. Removing array elements is not supported.
For example,
```yaml
ignorefields:
  status:
  metadata:
    resourceVersion:
    managedFields:
```
will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields"
from k8s object before comparing old & new k8s objects.
  • Loading branch information
ingodwerust committed Oct 14, 2024
1 parent a97d5dd commit 104e945
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ type Config struct {
// For watching specific namespace, leave it empty for watching all.
// this config is ignored when watching namespaces
Namespace string `json:"namespace,omitempty"`

// Specify fields to skip sending object update. Will be applied to all objects.
// If after removal of these fields from k8s object all remaining fields will be equal,
// handler won't trigger sending update. Removing array elements is not supported.
// For example,
// ignorefields:
// status:
// metadata:
// resourceVersion:
// managedFields:
// will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields"
// from k8s object before comparing old & new k8s objects.
IgnoredFields map[string]interface{} `json:"ignoredfields,omitempty"`
}

// Slack contains slack configuration
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/fatih/structtag v1.2.0
github.com/google/go-cmp v0.6.0
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect
github.com/hashicorp/hcl v0.0.0-20171017181929-23c074d0eceb // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
87 changes: 65 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/google/go-cmp/cmp"
)

const maxRetries = 5
Expand Down Expand Up @@ -131,7 +133,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics)
allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllCoreEventsCh := make(chan struct{})
defer close(stopAllCoreEventsCh)

Expand All @@ -155,7 +157,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics)
allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopAllEventsCh := make(chan struct{})
defer close(stopAllEventsCh)

Expand All @@ -177,7 +179,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -199,7 +201,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -222,7 +224,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -244,7 +246,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -266,7 +268,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -288,7 +290,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -310,7 +312,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -332,7 +334,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -354,7 +356,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -376,7 +378,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -398,7 +400,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -420,7 +422,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -442,7 +444,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -464,7 +466,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -486,7 +488,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -508,7 +510,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -530,7 +532,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -552,7 +554,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -583,7 +585,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics)
c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics, conf.IgnoredFields)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -596,7 +598,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
<-sigterm
}

func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller {
func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec, ignoredFields map[string]interface{}) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var newEvent Event
var err error
Expand Down Expand Up @@ -634,6 +636,15 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
if !ok {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert old to runtime.Object for update on %v", old)
}
if len(ignoredFields) > 0 {
diff, errDiff := diffObjects(old, new, ignoredFields)
if errDiff != nil {
logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot diff old & new objects %v and %v: %v", old, new, errDiff)
} else if len(diff) == 0 {
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Ignoring update to %v: %s", resourceType, newEvent.key)
return
}
}
logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Processing update to %v: %s", resourceType, newEvent.key)
if err == nil {
queue.Add(newEvent)
Expand Down Expand Up @@ -670,6 +681,38 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
}
}

func diffObjects(old, new interface{}, ignoredFields map[string]interface{}) (string, error) {
oldContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(old)
if err != nil {
return "", err
}
newContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(new)
if err != nil {
return "", err
}
recursiveDelete(oldContent, ignoredFields)
recursiveDelete(newContent, ignoredFields)
return cmp.Diff(oldContent, newContent), nil
}

// recursiveDelete recursively removes key from object
// value of key should be either nil or nested map[string]interface{}
// value of object to delete from should be nested map[string]interface{}
func recursiveDelete(object map[string]interface{}, key map[string]interface{}) {
for k, v := range key {
if v == nil {
delete(object, k)
continue
}
if recursiveKey, ok := v.(map[string]interface{}); ok {
if recursiveObj, ok := object[k].(map[string]interface{}); ok {
recursiveDelete(recursiveObj, recursiveKey)
}
}
}
return
}

// Run starts the kubewatch controller
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
Expand Down

0 comments on commit 104e945

Please sign in to comment.