diff --git a/cmd/exporter/main.go b/cmd/exporter/main.go index 292718a..46eb4e2 100644 --- a/cmd/exporter/main.go +++ b/cmd/exporter/main.go @@ -6,6 +6,8 @@ import ( "fmt" "net/http" + "github.com/kubesphere/kube-events/pkg/exporter/events" + "github.com/julienschmidt/httprouter" "github.com/kubesphere/kube-events/pkg/config" "github.com/kubesphere/kube-events/pkg/exporter" @@ -17,15 +19,17 @@ import ( ) var ( - masterURL string - kubeconfig string - configFile string + masterURL string + kubeconfig string + configFile string + newEventType bool ) func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.StringVar(&configFile, "config.file", "", "Event exporter configuration file path") + flag.BoolVar(&newEventType, "newEventType", false, "if true, exporter will use new event type") } func main() { @@ -42,10 +46,17 @@ func main() { klog.Fatal("Error building kubernetes clientset: ", e) } + go util.SetClusterName(kclient) + ctx, cancel := context.WithCancel(context.Background()) wg, ctx := errgroup.WithContext(ctx) - kes := exporter.NewKubeEventSource(kclient) + var kes exporter.EventSource + if newEventType { + kes = events.NewKubeEventSource(kclient) + } else { + kes = exporter.NewKubeEventSource(kclient) + } if e = reloadConfig(configFile, kes.ReloadConfig); e != nil { klog.Fatal("Error loading config: ", e) } diff --git a/go.mod b/go.mod index 14f726f..c16cf42 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( k8s.io/apimachinery v0.27.4 k8s.io/client-go v12.0.0+incompatible k8s.io/klog v1.0.0 + k8s.io/klog/v2 v2.90.1 sigs.k8s.io/controller-runtime v0.14.6 sigs.k8s.io/yaml v1.4.0 ) @@ -114,7 +115,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.26.1 // indirect k8s.io/component-base v0.26.1 // indirect - k8s.io/klog/v2 v2.90.1 // indirect k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/pkg/exporter/event_interface.go b/pkg/exporter/event_interface.go new file mode 100644 index 0000000..e82bdfa --- /dev/null +++ b/pkg/exporter/event_interface.go @@ -0,0 +1,12 @@ +package exporter + +import ( + "context" + "github.com/kubesphere/kube-events/pkg/config" +) + +type EventSource interface { + ReloadConfig(c *config.ExporterConfig) + + Run(ctx context.Context) error +} diff --git a/pkg/exporter/events/kube_events_exporter_new_version.go b/pkg/exporter/events/kube_events_exporter_new_version.go new file mode 100644 index 0000000..495354e --- /dev/null +++ b/pkg/exporter/events/kube_events_exporter_new_version.go @@ -0,0 +1,215 @@ +package events + +import ( + "context" + "errors" + "fmt" + "github.com/kubesphere/kube-events/pkg/util" + "sync" + + "github.com/kubesphere/kube-events/pkg/exporter/events/types" + + "github.com/kubesphere/kube-events/pkg/config" + "github.com/kubesphere/kube-events/pkg/exporter/events/sinks" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +const ( + // maxRetries is the number of times an object will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // an object is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +var maxBatchSize = 500 + +type K8sNewEventSource struct { + client *kubernetes.Clientset + workqueue workqueue.RateLimitingInterface + inf cache.SharedIndexInformer + sinkers []types.Sinker + mutex sync.Mutex + + cluster string +} + +func (s *K8sNewEventSource) ReloadConfig(c *config.ExporterConfig) { + s.mutex.Lock() + defer s.mutex.Unlock() + + var sinkers []types.Sinker + if c == nil || c.Sinks == nil { + s.sinkers = sinkers + return + } + + for _, w := range c.Sinks.Webhooks { + if w.Url != "" { + sinkers = append(sinkers, &sinks.WebhookSinker{Url: w.Url}) + } else if w.Service != nil { + sinkers = append(sinkers, &sinks.WebhookSinker{Url: fmt.Sprintf("http://%s.%s.svc:%d/%s", + w.Service.Name, w.Service.Namespace, *w.Service.Port, w.Service.Path)}) + } + } + + if so := c.Sinks.Stdout; so != nil { + sinkers = append(sinkers, &sinks.StdoutSinker{}) + } + s.sinkers = sinkers +} + +func (s *K8sNewEventSource) getClusterName() string { + ns, err := s.client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{}) + if err != nil { + klog.Errorf("get namespace kubesphere-system error: %s", err) + return "" + } + + if ns.Annotations != nil { + return ns.Annotations["cluster.kubesphere.io/name"] + } + + return "" +} + +func (s *K8sNewEventSource) getSinkers() []types.Sinker { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.sinkers[:] +} + +func (s *K8sNewEventSource) Run(ctx context.Context) error { + defer s.workqueue.ShutDown() + go s.sinkEvents(ctx) + go s.inf.Run(ctx.Done()) + if err := s.waitForCacheSync(ctx.Done()); err != nil { + return err + } + + <-ctx.Done() + return ctx.Err() +} + +func (s *K8sNewEventSource) waitForCacheSync(stopc <-chan struct{}) error { + if !cache.WaitForCacheSync(stopc, s.inf.HasSynced) { + return errors.New("failed to sync events cache") + } + klog.Info("Successfully synced events cache") + return nil +} + +func (s *K8sNewEventSource) drainEvents() (evts []*eventsv1.Event, shutdown bool) { + var ( + i = 0 + m = s.workqueue.Len() + ) + if m > maxBatchSize { + m = maxBatchSize + } + for { + var obj interface{} + obj, shutdown = s.workqueue.Get() + if obj != nil { + evts = append(evts, obj.(*eventsv1.Event)) + } + i++ + if i >= m { + break + } + } + return +} + +func (s *K8sNewEventSource) sinkEvents(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + evts, shutdown := s.drainEvents() + if len(evts) == 0 { + if shutdown { + return + } + continue + } + + func() { + var err error + defer func() { + for _, evt := range evts { + if err == nil { + s.workqueue.Forget(evt) + } else if numRequeues := s.workqueue.NumRequeues(evt); numRequeues >= maxRetries { + s.workqueue.Forget(evt) + klog.Infof("Dropping event %s/%s out of the queue because of failing %d times: %v\n", + evt.Namespace, evt.Name, numRequeues, err) + } else { + s.workqueue.AddRateLimited(evt) + } + s.workqueue.Done(evt) + } + }() + + events := types.Events{} + for _, e := range evts { + events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{ + Event: e, + Cluster: util.GetCluster(), + }) + } + + evtSinkers := s.getSinkers() + if len(evtSinkers) == 0 { + return + } + for _, sinker := range evtSinkers { + if err = sinker.Sink(ctx, events); err != nil { + err = fmt.Errorf("error sinking events: %v", err) + klog.Error(err) + return + } + } + }() + } +} + +func (s *K8sNewEventSource) enqueueEvent(obj interface{}) { + if obj == nil { + return + } + evt, ok := obj.(*eventsv1.Event) + if ok { + evt.SetManagedFields(nil) // set it nil because it is quite verbose + s.workqueue.Add(evt) + } +} + +func NewKubeEventSource(client *kubernetes.Clientset) *K8sNewEventSource { + s := &K8sNewEventSource{ + client: client, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "events"), + } + lw := cache.NewListWatchFromClient(client.EventsV1().RESTClient(), + "events", metav1.NamespaceAll, fields.Everything()) + s.inf = cache.NewSharedIndexInformer(lw, &eventsv1.Event{}, 0, cache.Indexers{}) + s.inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: s.enqueueEvent, + UpdateFunc: func(old, new interface{}) { + s.enqueueEvent(new) + }, + }) + + s.cluster = util.GetCluster() + + return s +} diff --git a/pkg/exporter/events/sinks/stdout.go b/pkg/exporter/events/sinks/stdout.go new file mode 100644 index 0000000..c031b06 --- /dev/null +++ b/pkg/exporter/events/sinks/stdout.go @@ -0,0 +1,23 @@ +package sinks + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/kubesphere/kube-events/pkg/exporter/events/types" +) + +type StdoutSinker struct { +} + +func (s *StdoutSinker) Sink(ctx context.Context, evts types.Events) error { + for _, evt := range evts.KubeEvents { + bs, err := json.Marshal(evt) + if err != nil { + return err + } + fmt.Println(string(bs)) + } + return nil +} diff --git a/pkg/exporter/events/sinks/webhook.go b/pkg/exporter/events/sinks/webhook.go new file mode 100644 index 0000000..acfee83 --- /dev/null +++ b/pkg/exporter/events/sinks/webhook.go @@ -0,0 +1,46 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/kubesphere/kube-events/pkg/exporter/events/types" + + "github.com/kubesphere/kube-events/pkg/util" +) + +type WebhookSinker struct { + Url string + Cluster string +} + +func (s *WebhookSinker) Sink(ctx context.Context, evts types.Events) error { + var buf bytes.Buffer + for _, evt := range evts.KubeEvents { + if bs, err := json.Marshal(evt); err != nil { + return err + } else if _, err := buf.Write(bs); err != nil { + return err + } else if err := buf.WriteByte('\n'); err != nil { + return err + } + } + + req, err := http.NewRequest("POST", s.Url, &buf) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return fmt.Errorf("error sinking to webhook(%s): %v", s.Url, err) + } + util.DrainResponse(resp) + if resp.StatusCode/100 != 2 { + return fmt.Errorf("error sinking to webhook(%s): bad response status: %s", s.Url, resp.Status) + } + return nil +} diff --git a/pkg/exporter/events/types/types.go b/pkg/exporter/events/types/types.go new file mode 100644 index 0000000..642a7ea --- /dev/null +++ b/pkg/exporter/events/types/types.go @@ -0,0 +1,20 @@ +package types + +import ( + "context" + + v1 "k8s.io/api/events/v1" +) + +type Events struct { + KubeEvents []*ExtendedEvent `json:"kubeEvents"` +} + +type ExtendedEvent struct { + *v1.Event `json:",inline"` + Cluster string `json:"cluster,omitempty"` +} + +type Sinker interface { + Sink(ctx context.Context, events Events) error +} diff --git a/pkg/exporter/kube_events_exporter.go b/pkg/exporter/kube_events_exporter.go index dfd06ba..0458e23 100644 --- a/pkg/exporter/kube_events_exporter.go +++ b/pkg/exporter/kube_events_exporter.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/kubesphere/kube-events/pkg/util" "sync" "github.com/kubesphere/kube-events/pkg/config" @@ -162,7 +163,7 @@ func (s *K8sEventSource) sinkEvents(ctx context.Context) { for _, e := range evts { events.KubeEvents = append(events.KubeEvents, &types.ExtendedEvent{ Event: e, - Cluster: s.cluster, + Cluster: util.GetCluster(), }) } @@ -207,7 +208,7 @@ func NewKubeEventSource(client *kubernetes.Clientset) *K8sEventSource { }, }) - s.cluster = s.getClusterName() + s.cluster = util.GetCluster() return s } diff --git a/pkg/util/annoutil.go b/pkg/util/annoutil.go new file mode 100644 index 0000000..262b869 --- /dev/null +++ b/pkg/util/annoutil.go @@ -0,0 +1,42 @@ +package util + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +var cluster string + +func SetClusterName(client *kubernetes.Clientset) { + setCluster(client) + c := time.Tick(60 * time.Second) + for { + select { + case <-c: + setCluster(client) + klog.Infof("current cluster is [%s]", GetCluster()) + } + } + +} + +func setCluster(client *kubernetes.Clientset) { + + ns, err := client.CoreV1().Namespaces().Get(context.Background(), "kubesphere-system", metav1.GetOptions{}) + if err != nil { + klog.Errorf("get namespace kubesphere-system error: %s", err) + } + + if ns.Annotations != nil { + cluster = ns.Annotations["cluster.kubesphere.io/name"] + } + +} + +func GetCluster() string { + return cluster +}