Skip to content

Commit

Permalink
add cluster name to the event when export
Browse files Browse the repository at this point in the history
Signed-off-by: wanjunlei <[email protected]>
  • Loading branch information
wanjunlei committed Nov 14, 2023
1 parent 7e1d867 commit cce8a17
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,6 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
Expand Down
20 changes: 18 additions & 2 deletions pkg/exporter/kube_events_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
var maxBatchSize = 500

type K8sEventSource struct {
client *kubernetes.Clientset
workqueue workqueue.RateLimitingInterface
inf cache.SharedIndexInformer
sinkers []types.Sinker
Expand All @@ -48,9 +49,9 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) {

for _, w := range c.Sinks.Webhooks {
if w.Url != "" {
sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url})
sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url, Cluster: s.getClusterName()})
} else if w.Service != nil {
sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s",
sinkers = append(sinkers, &sinks.WebhookSinker{Cluster: s.getClusterName(), Url: fmt.Sprintf("http://%s.%s.svc:%d/%s",
w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)})
}
}
Expand All @@ -61,6 +62,20 @@ func (s *K8sEventSource) ReloadConfig(c *config.ExporterConfig) {
s.sinkers = sinkers
}

func (s *K8sEventSource) getClusterName() string {
ns, err := s.client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{})
if err != nil {
klog.Errorf("get namespace kubesphere-system error: %s", err)
return ""
}

if ns.Annotations != nil {
return ns.Annotations["cluster.kubesphere.io/name"]
}

return ""
}

func (s *K8sEventSource) getSinkers() []types.Sinker {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand Down Expand Up @@ -168,6 +183,7 @@ func (s *K8sEventSource) enqueueEvent(obj interface{}) {

func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource {
s := &K8sEventSource{
client: client,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"),
}
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(),
Expand Down
14 changes: 12 additions & 2 deletions pkg/exporter/sinks/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@ import (
)

type WebhookSinker struct {
Url string
Url string
Cluster string
}

type extendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

func (s *WebhookSinker) Sink(ctx context.Context, evts []*v1.Event) error {
var buf bytes.Buffer
for _, evt := range evts {
if bs, err := json.Marshal(evt); err != nil {
extendedEvt := extendedEvent{
Event: evt,
Cluster: s.Cluster,
}
if bs, err := json.Marshal(extendedEvt); err != nil {
return err
} else if _, err := buf.Write(bs); err != nil {
return err
Expand Down

0 comments on commit cce8a17

Please sign in to comment.