From cce8a1793a7dc867e75060ae3aadb59f80f82978 Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Tue, 14 Nov 2023 17:25:00 +0800 Subject: [PATCH] add cluster name to the event when export Signed-off-by: wanjunlei --- go.sum | 1 - pkg/exporter/kube_events_exporter.go | 20 ++++++++++++++++++-- pkg/exporter/sinks/webhook.go | 14 ++++++++++++-- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/go.sum b/go.sum index 85875b1..8bf97aa 100644 --- a/go.sum +++ b/go.sum @@ -1355,7 +1355,6 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA= sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= -sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= diff --git a/pkg/exporter/kube_events_exporter.go b/pkg/exporter/kube_events_exporter.go index 3cad4b5..288b257 100644 --- a/pkg/exporter/kube_events_exporter.go +++ b/pkg/exporter/kube_events_exporter.go @@ -30,6 +30,7 @@ const ( var maxBatchSize = 500 type K8sEventSource struct { + client *kubernetes.Clientset workqueue workqueue.RateLimitingInterface inf cache.SharedIndexInformer sinkers []types.Sinker @@ -48,9 +49,9 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) { for _, w := range c.Sinks.Webhooks { if w.Url != "" { - sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url}) + sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url, Cluster: s.getClusterName()}) } else if w.Service != nil { - sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s", + sinkers = append(sinkers, &sinks.WebhookSinker{Cluster: s.getClusterName(), Url: fmt.Sprintf("http://%s.%s.svc:%d/%s", w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)}) } } @@ -61,6 +62,20 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) { s.sinkers = sinkers } +func (s *K8sEventSource) getClusterName() string { + ns, err := s.client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{}) + if err != nil { + klog.Errorf("get namespace kubesphere-system error: %s", err) + return "" + } + + if ns.Annotations != nil { + return ns.Annotations["cluster.kubesphere.io/name"] + } + + return "" +} + func (s *K8sEventSource) getSinkers() []types.Sinker { s.mutex.Lock() defer s.mutex.Unlock() @@ -168,6 +183,7 @@ func (s *K8sEventSource) enqueueEvent(obj interface{}) { func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource { s := &K8sEventSource{ + client: client, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"), } lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), diff --git a/pkg/exporter/sinks/webhook.go b/pkg/exporter/sinks/webhook.go index cb502e9..4da51e7 100644 --- a/pkg/exporter/sinks/webhook.go +++ b/pkg/exporter/sinks/webhook.go @@ -12,13 +12,23 @@ import ( ) type WebhookSinker struct { - Url string + Url string + Cluster string +} + +type extendedEvent struct { + *v1.Event `json:",inline"` + Cluster string `json:"cluster,omitempty"` } func (s *WebhookSinker) Sink(ctx context.Context, evts []*v1.Event) error { var buf bytes.Buffer for _, evt := range evts { - if bs, err := json.Marshal(evt); err != nil { + extendedEvt := extendedEvent{ + Event: evt, + Cluster: s.Cluster, + } + if bs, err := json.Marshal(extendedEvt); err != nil { return err } else if _, err := buf.Write(bs); err != nil { return err