Skip to content

Commit

Permalink
support new event version
Browse files Browse the repository at this point in the history
  • Loading branch information
Gentleelephant committed Aug 7, 2024
1 parent 91e09c2 commit 080938e
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 7 deletions.
19 changes: 15 additions & 4 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"net/http"

"github.com/kubesphere/kube-events/pkg/exporter/events"

"github.com/julienschmidt/httprouter"
"github.com/kubesphere/kube-events/pkg/config"
"github.com/kubesphere/kube-events/pkg/exporter"
Expand All @@ -17,15 +19,17 @@ import (
)

var (
masterURL string
kubeconfig string
configFile string
masterURL string
kubeconfig string
configFile string
newEventType bool
)

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&configFile, "config.file", "", "Event exporter configuration file path")
flag.BoolVar(&newEventType, "newEventType", false, "if true, exporter will use new event type")
}

func main() {
Expand All @@ -42,10 +46,17 @@ func main() {
klog.Fatal("Error building kubernetes clientset: ", e)
}

go util.SetClusterName(kclient)

ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx)

kes := exporter.NewKubeEventSource(kclient)
var kes exporter.EventSource
if newEventType {
kes = events.NewKubeEventSource(kclient)
} else {
kes = exporter.NewKubeEventSource(kclient)
}
if e = reloadConfig(configFile, kes.ReloadConfig); e != nil {
klog.Fatal("Error loading config: ", e)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/apimachinery v0.27.4
k8s.io/client-go v12.0.0+incompatible
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.90.1
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/component-base v0.26.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
12 changes: 12 additions & 0 deletions pkg/exporter/event_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package exporter

import (
"context"
"github.com/kubesphere/kube-events/pkg/config"
)

type EventSource interface {
ReloadConfig(c *config.ExporterConfig)

Run(ctx context.Context) error
}
215 changes: 215 additions & 0 deletions pkg/exporter/events/kube_events_exporter_new_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package events

import (
"context"
"errors"
"fmt"
"github.com/kubesphere/kube-events/pkg/util"
"sync"

"github.com/kubesphere/kube-events/pkg/exporter/events/types"

"github.com/kubesphere/kube-events/pkg/config"
"github.com/kubesphere/kube-events/pkg/exporter/events/sinks"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

const (
// maxRetries is the number of times an object will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// an object is going to be requeued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)

var maxBatchSize = 500

type K8sNewEventSource struct {
client *kubernetes.Clientset
workqueue workqueue.RateLimitingInterface
inf cache.SharedIndexInformer
sinkers []types.Sinker
mutex sync.Mutex

cluster string
}

func (s *K8sNewEventSource) ReloadConfig(c *config.ExporterConfig) {
s.mutex.Lock()
defer s.mutex.Unlock()

var sinkers []types.Sinker
if c == nil || c.Sinks == nil {
s.sinkers = sinkers
return
}

for _, w := range c.Sinks.Webhooks {
if w.Url != "" {
sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url})
} else if w.Service != nil {
sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s",
w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)})
}
}

if so := c.Sinks.Stdout; so != nil {
sinkers = append(sinkers, &sinks.StdoutSinker{})
}
s.sinkers = sinkers
}

func (s *K8sNewEventSource) getClusterName() string {
ns, err := s.client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{})
if err != nil {
klog.Errorf("get namespace kubesphere-system error: %s", err)
return ""
}

if ns.Annotations != nil {
return ns.Annotations["cluster.kubesphere.io/name"]
}

return ""
}

func (s *K8sNewEventSource) getSinkers() []types.Sinker {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.sinkers[:]
}

func (s *K8sNewEventSource) Run(ctx context.Context) error {
defer s.workqueue.ShutDown()
go s.sinkEvents(ctx)
go s.inf.Run(ctx.Done())
if err := s.waitForCacheSync(ctx.Done()); err != nil {
return err
}

<-ctx.Done()
return ctx.Err()
}

func (s *K8sNewEventSource) waitForCacheSync(stopc <-chan struct{}) error {
if !cache.WaitForCacheSync(stopc, s.inf.HasSynced) {
return errors.New("failed to sync events cache")
}
klog.Info("Successfully synced events cache")
return nil
}

func (s *K8sNewEventSource) drainEvents() (evts []*eventsv1.Event, shutdown bool) {
var (
i = 0
m = s.workqueue.Len()
)
if m > maxBatchSize {
m = maxBatchSize
}
for {
var obj interface{}
obj, shutdown = s.workqueue.Get()
if obj != nil {
evts = append(evts, obj.(*eventsv1.Event))
}
i++
if i >= m {
break
}
}
return
}

func (s *K8sNewEventSource) sinkEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
evts, shutdown := s.drainEvents()
if len(evts) == 0 {
if shutdown {
return
}
continue
}

func() {
var err error
defer func() {
for _, evt := range evts {
if err == nil {
s.workqueue.Forget(evt)
} else if numRequeues := s.workqueue.NumRequeues(evt); numRequeues >= maxRetries {
s.workqueue.Forget(evt)
klog.Infof("Dropping event %s/%s out of the queue because of failing %d times: %v\n",
evt.Namespace, evt.Name, numRequeues, err)
} else {
s.workqueue.AddRateLimited(evt)
}
s.workqueue.Done(evt)
}
}()

events := types.Events{}
for _, e := range evts {
events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{
Event: e,
Cluster: util.GetCluster(),
})
}

evtSinkers := s.getSinkers()
if len(evtSinkers) == 0 {
return
}
for _, sinker := range evtSinkers {
if err = sinker.Sink(ctx, events); err != nil {
err = fmt.Errorf("error sinking events: %v", err)
klog.Error(err)
return
}
}
}()
}
}

func (s *K8sNewEventSource) enqueueEvent(obj interface{}) {
if obj == nil {
return
}
evt, ok := obj.(*eventsv1.Event)
if ok {
evt.SetManagedFields(nil) // set it nil because it is quite verbose
s.workqueue.Add(evt)
}
}

func NewKubeEventSource(client *kubernetes.Clientset) *K8sNewEventSource {
s := &K8sNewEventSource{
client: client,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"),
}
lw := cache.NewListWatchFromClient(client.EventsV1().RESTClient(),
"events", metav1.NamespaceAll, fields.Everything())
s.inf = cache.NewSharedIndexInformer(lw, &eventsv1.Event{}, 0, cache.Indexers{})
s.inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueEvent,
UpdateFunc: func(old, new interface{}) {
s.enqueueEvent(new)
},
})

s.cluster = util.GetCluster()

return s
}
23 changes: 23 additions & 0 deletions pkg/exporter/events/sinks/stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package sinks

import (
"context"
"encoding/json"
"fmt"

"github.com/kubesphere/kube-events/pkg/exporter/events/types"
)

type StdoutSinker struct {
}

func (s *StdoutSinker) Sink(ctx context.Context, evts types.Events) error {
for _, evt := range evts.KubeEvents {
bs, err := json.Marshal(evt)
if err != nil {
return err
}
fmt.Println(string(bs))
}
return nil
}
46 changes: 46 additions & 0 deletions pkg/exporter/events/sinks/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sinks

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"

"github.com/kubesphere/kube-events/pkg/exporter/events/types"

"github.com/kubesphere/kube-events/pkg/util"
)

type WebhookSinker struct {
Url string
Cluster string
}

func (s *WebhookSinker) Sink(ctx context.Context, evts types.Events) error {
var buf bytes.Buffer
for _, evt := range evts.KubeEvents {
if bs, err := json.Marshal(evt); err != nil {
return err
} else if _, err := buf.Write(bs); err != nil {
return err
} else if err := buf.WriteByte('\n'); err != nil {
return err
}
}

req, err := http.NewRequest("POST", s.Url, &buf)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error sinking to webhook(%s): %v", s.Url, err)
}
util.DrainResponse(resp)
if resp.StatusCode/100 != 2 {
return fmt.Errorf("error sinking to webhook(%s): bad response status: %s", s.Url, resp.Status)
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/exporter/events/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package types

import (
"context"

v1 "k8s.io/api/events/v1"
)

type Events struct {
KubeEvents []*ExtendedEvent `json:"kubeEvents"`
}

type ExtendedEvent struct {
*v1.Event `json:",inline"`
Cluster string `json:"cluster,omitempty"`
}

type Sinker interface {
Sink(ctx context.Context, events Events) error
}
Loading

0 comments on commit 080938e

Please sign in to comment.