Skip to content

Commit

Permalink
Initial check-in event watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Neha Manjunath committed Jul 13, 2023
1 parent 8ec979d commit f91356e
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 0 deletions.
114 changes: 114 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Package events provides events status for a namespace using kubectl.

Check failure on line 1 in events/events.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/openconfig/kne/events
package events

import (
"context"
"errors"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

// A EventStatus represents the status of a single Event.
type EventStatus struct {
Name string // name of event
UID types.UID
Namespace string
Event corev1.Event // copy of the raw event
}

func (p *EventStatus) String() string {
var buf strings.Builder
add := func(k, v string) {
if v != "" {
fmt.Fprintf(&buf, ", %s: %q", k, v)
}
}
fmt.Fprintf(&buf, "{Name: %q", p.Name)
add("UID", string(p.UID))
add("Namespace", p.Namespace)
return buf.String()
}

func (p *EventStatus) Equal(q *EventStatus) bool {
if p.UID != q.UID ||
p.Name != q.Name ||
p.Namespace != q.Namespace {
return false
}
return true
}

// Values for EventType. These constants are copied from k8s.io/api/core/v1 as a
// convenience.
const (
EventNormal = corev1.EventTypeNormal
EventWarning = corev1.EventTypeWarning
)

// GetEventStatus returns the status of the events found in the supplied namespace.
func GetEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) ([]*EventStatus, error) {
events, err := client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
var statuses []*EventStatus
for _, event := range events.Items {
event := event
statuses = append(statuses, EventToStatus(&event))
}
return statuses, nil
}

// WatchEventStatus returns a channel on which the status of the events in the
// supplied namespace are written.
func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) (chan *EventStatus, func(), error) {
if ctx == nil {
return nil, nil, errors.New("WatchEventStatus: nil context ")
}
if client == nil {
return nil, nil, errors.New("WatchEventStatus: nil client ")
}
w, err := client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, nil, err
}
kch := w.ResultChan()
ch := make(chan *EventStatus, 2)

go func() {
defer close(ch)
// seen is used to drop duplicate updates
seen := map[types.UID]*EventStatus{}
for event := range kch {
switch e := event.Object.(type) {
case *corev1.Event:
s := EventToStatus(e)
if os, ok := seen[s.UID]; ok {
if s.Equal(os) {
continue
}
}
seen[s.UID] = s
ch <- s
}
}
}()
return ch, w.Stop, nil
}

// EventToStatus returns a pointer to a new EventStatus for an event.
func EventToStatus(event *corev1.Event) *EventStatus {
s := EventStatus{
Name: event.ObjectMeta.Name,
Namespace: event.ObjectMeta.Namespace,
UID: event.ObjectMeta.UID,
}
event.DeepCopyInto(&s.Event)
event = &s.Event
return &s
}
119 changes: 119 additions & 0 deletions events/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package events

import (
"context"
"fmt"
"io"
"os"
"sync"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
log "k8s.io/klog/v2"
)

// A Watcher watches event updates.
type Watcher struct {
ctx context.Context
errCh chan error
wstop func()
cancel func()
eventStates map[types.UID]string
ch chan *EventStatus
stdout io.Writer
warningf func(string, ...any)

mu sync.Mutex
progress bool
currentNamespace string
currentEvent types.UID
}

// NewWatcher returns a Watcher on the provided client or an error. The cancel
// function is called when the Watcher determines an event has permanently
// failed. The Watcher will exit if the context provided is canceled, an error
// is encountered, or Cleanup is called.
func NewWatcher(ctx context.Context, client kubernetes.Interface, cancel func()) (*Watcher, error) {
ch, stop, err := WatchEventStatus(ctx, client, "")
if err != nil {
return nil, err
}
w := newWatcher(ctx, cancel, ch, stop)
go w.watch()
return w, nil
}

func newWatcher(ctx context.Context, cancel func(), ch chan *EventStatus, stop func()) *Watcher {
w := &Watcher{
ctx: ctx,
ch: ch,
wstop: stop,
cancel: cancel,
stdout: os.Stdout,
eventStates: map[types.UID]string{},
warningf: log.Warningf,
}
// A channel is used to record errors from the watcher to prevent any
// possible race conditions if Cleanup is called while an update is
// happening. At most one error will be written to the channel.
w.errCh = make(chan error, 1)
w.display("Displaying state changes for events")
return w
}

// SetProgress determins if progress output should be displayed while watching.
func (w *Watcher) SetProgress(value bool) {
w.mu.Lock()
w.progress = value
w.mu.Unlock()
}

func (w *Watcher) stop() {
w.mu.Lock()
stop := w.wstop
w.wstop = nil
w.mu.Unlock()
if stop != nil {
stop()
}
}

// Cleanup should be called when the Watcher is no longer needed. If the
// Watcher encountered an error the provided err is logged and the Watcher error
// is returned, otherwise err is returned.
func (w *Watcher) Cleanup(err error) error {
w.stop()
select {
case werr := <-w.errCh:
if err != nil {
w.warningf("Deploy() failed: %v", err)
}
w.warningf("Deployment failed: %v", werr)
return werr
default:
}
return err
}

func (w *Watcher) watch() {
defer w.stop()
for {
select {
case s, ok := <-w.ch:
if !ok || !w.updateEvent(s) {

Check failure on line 104 in events/status.go

View workflow job for this annotation

GitHub Actions / build

w.updateEvent undefined (type *Watcher has no field or method updateEvent)

Check failure on line 104 in events/status.go

View workflow job for this annotation

GitHub Actions / lint

w.updateEvent undefined (type *Watcher has no field or method updateEvent) (typecheck)
return
}
case <-w.ctx.Done():
return
}
}
}

var timeNow = func() string { return time.Now().Format("15:04:05 ") }

func (w *Watcher) display(format string, v ...any) {
if w.progress {
fmt.Fprintf(w.stdout, timeNow()+format+"\n", v...)
}
}

0 comments on commit f91356e

Please sign in to comment.