Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event watcher for topo create #399

Merged
merged 11 commits into from
Jul 25, 2023
118 changes: 118 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Package events provides events status for a namespace using kubectl.
NehaManjunath marked this conversation as resolved.
Show resolved Hide resolved
package events

import (
"context"
"errors"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

// A EventStatus represents the status of a single Event.
type EventStatus struct {
Name string // name of event
UID types.UID
Namespace string
Message string
Type string
Event corev1.Event // copy of the raw event
}

func (e *EventStatus) String() string {
var buf strings.Builder
add := func(k, v string) {
if v != "" {
fmt.Fprintf(&buf, ", %s: %q", k, v)
}
}
fmt.Fprintf(&buf, "{Name: %q", e.Name)
add("UID", string(e.UID))
add("Namespace", e.Namespace)
return buf.String()
}

func (e *EventStatus) Equal(q *EventStatus) bool {
if e.UID != q.UID ||
e.Name != q.Name ||
e.Namespace != q.Namespace {
return false
}
return true
}

// Values for EventType. These constants are copied from k8s.io/api/core/v1 as a
// convenience.
const (
EventNormal = corev1.EventTypeNormal
EventWarning = corev1.EventTypeWarning
)

// GetEventStatus returns the status of the events found in the supplied namespace.
func GetEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) ([]*EventStatus, error) {
events, err := client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
var statuses []*EventStatus
for _, event := range events.Items {
event := event
statuses = append(statuses, EventToStatus(&event))
}
return statuses, nil
}

// WatchEventStatus returns a channel on which the status of the events in the
// supplied namespace are written.
func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) (chan *EventStatus, func(), error) {
if ctx == nil {
return nil, nil, errors.New("WatchEventStatus: nil context ")
}
if client == nil {
return nil, nil, errors.New("WatchEventStatus: nil client ")
}
w, err := client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, nil, err
}
kch := w.ResultChan()
ch := make(chan *EventStatus, 2)

go func() {
defer close(ch)
// seen is used to drop duplicate updates
seen := map[types.UID]*EventStatus{}
for event := range kch {
switch e := event.Object.(type) {
case *corev1.Event:
s := EventToStatus(e)
if os, ok := seen[s.UID]; ok {
if s.Equal(os) {
continue
}
}
seen[s.UID] = s
ch <- s
}
}
}()
return ch, w.Stop, nil
}

// EventToStatus returns a pointer to a new EventStatus for an event.
func EventToStatus(event *corev1.Event) *EventStatus {
s := EventStatus{
Name: event.ObjectMeta.Name,
Namespace: event.ObjectMeta.Namespace,
UID: event.ObjectMeta.UID,
}
event.DeepCopyInto(&s.Event)
event = &s.Event
s.Type = event.DeepCopy().Type
s.Message = event.DeepCopy().Message
NehaManjunath marked this conversation as resolved.
Show resolved Hide resolved
return &s
}
141 changes: 141 additions & 0 deletions events/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package events

import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
log "k8s.io/klog/v2"
)

// A Watcher watches event updates.
type Watcher struct {
ctx context.Context
errCh chan error
wstop func()
cancel func()
eventStates map[types.UID]string
ch chan *EventStatus
stdout io.Writer
warningf func(string, ...any)

mu sync.Mutex
progress bool
currentNamespace string
currentEvent types.UID

Check failure on line 31 in events/status.go

View workflow job for this annotation

GitHub Actions / lint

field `currentEvent` is unused (unused)
NehaManjunath marked this conversation as resolved.
Show resolved Hide resolved
}

var errorMsgs = [2]string{"Insufficient memory", "Insufficient cpu"}

// NewWatcher returns a Watcher on the provided client or an error. The cancel
// function is called when the Watcher determines an event has permanently
// failed. The Watcher will exit if the context provided is canceled, an error
// is encountered, or Cleanup is called.
func NewWatcher(ctx context.Context, client kubernetes.Interface, cancel func()) (*Watcher, error) {
ch, stop, err := WatchEventStatus(ctx, client, "")
if err != nil {
return nil, err
}
w := newWatcher(ctx, cancel, ch, stop)
go w.watch()
return w, nil
}

func newWatcher(ctx context.Context, cancel func(), ch chan *EventStatus, stop func()) *Watcher {
w := &Watcher{
ctx: ctx,
ch: ch,
wstop: stop,
cancel: cancel,
stdout: os.Stdout,
eventStates: map[types.UID]string{},
warningf: log.Warningf,
}
// A channel is used to record errors from the watcher to prevent any
// possible race conditions if Cleanup is called while an update is
// happening. At most one error will be written to the channel.
w.errCh = make(chan error, 1)
w.display("Displaying state changes for events")
return w
}

// SetProgress determins if progress output should be displayed while watching.
func (w *Watcher) SetProgress(value bool) {
w.mu.Lock()
w.progress = value
w.mu.Unlock()
}

func (w *Watcher) stop() {
w.mu.Lock()
stop := w.wstop
w.wstop = nil
w.mu.Unlock()
if stop != nil {
stop()
}
}

// Cleanup should be called when the Watcher is no longer needed. If the
// Watcher encountered an error the provided err is logged and the Watcher error
// is returned, otherwise err is returned.
func (w *Watcher) Cleanup(err error) error {
w.stop()
select {
case werr := <-w.errCh:
if err != nil {
w.warningf("Deploy() failed: %v", err)
}
w.warningf("Deployment failed: %v", werr)
return werr
default:
}
return err
}

func (w *Watcher) watch() {
defer w.stop()
for {
select {
case s, ok := <-w.ch:
if !ok || !w.displayEvent(s) {
return
}
case <-w.ctx.Done():
return
}
}
}

var timeNow = func() string { return time.Now().Format("15:04:05 ") }

func (w *Watcher) display(format string, v ...any) {
if w.progress {
fmt.Fprintf(w.stdout, timeNow()+format+"\n", v...)
}
}

func (w *Watcher) displayEvent(s *EventStatus) bool {
w.display("NS: %s", s.Namespace)
w.display("Event: %s", s.Name)
w.display("EventType: %s", s.Type)
w.display("Event message: %s", s.Message)

message := s.Message
for _, m := range errorMsgs {
// Error out if namespace is currentnamesapce and message contains predefinedcheck namespace
if w.currentNamespace == s.Namespace && strings.Contains(message, m) {
w.errCh <- fmt.Errorf("Event failed due to %s . Message: %s", s.Event.Reason, message)
w.cancel()
return false
}
}

return true
}
Loading
Loading