diff --git a/events/data_test.go b/events/data_test.go new file mode 100644 index 00000000..ad2a65a0 --- /dev/null +++ b/events/data_test.go @@ -0,0 +1,218 @@ +package events + +import ( + "encoding/json" + + corev1 "k8s.io/api/core/v1" +) + +// Variables are named +// +// name#data Raw json from kubectl +// name#event Data converted into a corev1.Event +// name#status EventStatus version +// name#string EventStatus as a string + +func json2event(j string) *corev1.Event { + var event corev1.Event + if err := json.Unmarshal(([]byte)(j), &event); err != nil { + panic(err) + } + return &event +} + +var ( + normal1event = json2event(normal1eventdata) + warning1event = json2event(warning1eventdata) + warning2event = json2event(warning2eventdata) +) + +// We only use 3 events when checking EventStatus.String as these cover all the cases. +var ( + normal1eventstring = `{Name: "service-r2.1772fc5571f425ff", UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", Namespace: "ceos2"}` + warning1eventstring = `{Name: "r1.177351c307a38f38", UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", Namespace: "ceos1"}` + warning2eventstring = `{Name: "r1.17735337a69ffd33", UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", Namespace: "ceos6"}` +) + +var ( + normal1eventdata = ` + { + "metadata": { + "name": "service-r2.1772fc5571f425ff", + "namespace": "ceos2", + "uid": "a756a650-bf61-4da6-8d69-d5ae40bd943c", + "resourceVersion": "3243", + "creationTimestamp": "2023-07-18T14:24:14Z", + "managedFields": [ + { + "manager": "speaker", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-18T14:24:14Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Service", + "namespace": "ceos2", + "name": "service-r2", + "uid": "2459a3c5-c353-4a50-aa70-dc897ffb051e", + "apiVersion": "v1", + "resourceVersion": "3076" + }, + "reason": "nodeAssigned", + "message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + "source": { + "component": "metallb-speaker" + }, + "firstTimestamp": "2023-07-18T14:24:14Z", + "lastTimestamp": "2023-07-18T14:24:14Z", + "count": 1, + "type": "Normal", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + ` + + normal1eventstatus = EventStatus{ + Name: "service-r2.1772fc5571f425ff", + UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", + Namespace: "ceos2", + Message: "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + Type: "Normal", + } + + warning1eventdata = ` + { + "metadata": { + "name": "r1.177351c307a38f38", + "namespace": "ceos1", + "uid": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + "resourceVersion": "325871", + "creationTimestamp": "2023-07-19T16:29:43Z", + "managedFields": [ + { + "manager": "kube-scheduler", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-19T16:39:50Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Pod", + "namespace": "ceos1", + "name": "r1", + "uid": "b69a3fd5-2f25-42aa-91c4-146c40e53053", + "apiVersion": "v1", + "resourceVersion": "323358" + }, + "reason": "FailedScheduling", + "message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "source": { + "component": "default-scheduler" + }, + "firstTimestamp": "2023-07-19T16:29:43Z", + "lastTimestamp": "2023-07-19T16:39:50Z", + "count": 3, + "type": "Warning", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + ` + warning1eventstatus = EventStatus{ + Name: "r1.177351c307a38f38", + UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + Namespace: "ceos1", + Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + Type: "Warning", + } + + warning2eventdata = ` + { + "metadata": { + "name": "r1.17735337a69ffd33", + "namespace": "ceos6", + "uid": "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", + "resourceVersion": "1621", + "creationTimestamp": "2023-07-19T16:56:23Z", + "managedFields": [ + { + "manager": "kube-scheduler", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-19T16:56:23Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Pod", + "namespace": "ceos6", + "name": "r1", + "uid": "140da1fb-0ff4-4b9d-b512-41c64bd9b1e1", + "apiVersion": "v1", + "resourceVersion": "1620" + }, + "reason": "FailedScheduling", + "message": "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "source": { + "component": "default-scheduler" + }, + "firstTimestamp": "2023-07-19T16:56:23Z", + "lastTimestamp": "2023-07-19T16:56:23Z", + "count": 1, + "type": "Warning", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + ` + warning2eventstatus = EventStatus{ + Name: "r1.17735337a69ffd33", + UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", + Namespace: "ceos6", + Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + Type: "Warning", + } +) diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..15ee0cec --- /dev/null +++ b/events/events.go @@ -0,0 +1,114 @@ +// Package events provides events status for a namespace. +package events + +import ( + "context" + "errors" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// A EventStatus represents the status of a single Event. +type EventStatus struct { + Name string // name of event + UID types.UID + Namespace string + Message string + Type string + Event corev1.Event // copy of the raw event +} + +func (e *EventStatus) String() string { + var buf strings.Builder + add := func(k, v string) { + if v != "" { + fmt.Fprintf(&buf, ", %s: %q", k, v) + } + } + fmt.Fprintf(&buf, "{Name: %q", e.Name) + add("UID", string(e.UID)) + add("Namespace", e.Namespace) + fmt.Fprint(&buf, "}") + return buf.String() +} + +func (e *EventStatus) Equal(q *EventStatus) bool { + if e.UID != q.UID || + e.Name != q.Name || + e.Namespace != q.Namespace { + return false + } + return true +} + +// Values for EventType. These constants are copied from k8s.io/api/core/v1 as a +// convenience. +const ( + EventNormal = corev1.EventTypeNormal + EventWarning = corev1.EventTypeWarning +) + +// GetEventStatus returns the status of the events found in the supplied namespace. +func GetEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) ([]*EventStatus, error) { + events, err := client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + var statuses []*EventStatus + for _, event := range events.Items { + event := event + statuses = append(statuses, EventToStatus(&event)) + } + return statuses, nil +} + +// WatchEventStatus returns a channel on which the status of the events in the +// supplied namespace are written. +func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) (chan *EventStatus, func(), error) { + if ctx == nil { + return nil, nil, errors.New("WatchEventStatus: nil context ") + } + if client == nil { + return nil, nil, errors.New("WatchEventStatus: nil client ") + } + w, err := client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, err + } + kch := w.ResultChan() + ch := make(chan *EventStatus, 2) + initialTimestamp := metav1.Now() + + go func() { + defer close(ch) + for event := range kch { + switch e := event.Object.(type) { + case *corev1.Event: + if !e.CreationTimestamp.Before(&initialTimestamp) { + s := EventToStatus(e) + ch <- s + } + } + } + }() + return ch, w.Stop, nil +} + +// EventToStatus returns a pointer to a new EventStatus for an event. +func EventToStatus(event *corev1.Event) *EventStatus { + s := EventStatus{ + Name: event.ObjectMeta.Name, + Namespace: event.ObjectMeta.Namespace, + UID: event.ObjectMeta.UID, + } + event.DeepCopyInto(&s.Event) + event = &s.Event + s.Type = event.Type + s.Message = event.Message + return &s +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..a0cf928b --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,218 @@ +package events + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/h-fam/errdiff" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kfake "k8s.io/client-go/kubernetes/fake" + ktest "k8s.io/client-go/testing" +) + +func TestGetEventStatus(t *testing.T) { + client := kfake.NewSimpleClientset() + var eventUpdates = [][]*corev1.Event{ + {normal1event}, + {warning1event}, + {warning2event}, + } + + var eventGetUpdatesStatus = [][]EventStatus{ + {normal1eventstatus}, + {warning1eventstatus}, + {warning2eventstatus}, + } + + next := -1 + + client.PrependReactor("list", "events", func(action ktest.Action) (bool, runtime.Object, error) { + next++ + if next >= len(eventUpdates) { + return false, nil, nil + } + el := corev1.EventList{} + for _, e := range eventUpdates[next] { + el.Items = append(el.Items, *e) + } + return true, &el, nil + }) + + for i := range eventUpdates { + got, err := GetEventStatus(context.Background(), client, "") + if err != nil { + t.Fatal(err) + } + want := eventGetUpdatesStatus[i] + j := -1 + for len(got) > 0 { + j++ + gotevent := got[0] + got = got[1:] + if len(want) == 0 { + t.Errorf("%d-%d: extra event: %v", i, j, got) + continue + } + wantevent := want[0] + want = want[1:] + if !gotevent.Equal(&wantevent) { + t.Errorf("%d-%d FAIL:\ngot : %s\nwant: %s", i, j, gotevent.String(), wantevent.String()) + } else if false { + t.Logf("%d-%d okay:\ngot : %s\nwant: %s", i, j, gotevent.String(), wantevent.String()) + } + + if false { + if s := cmp.Diff(got, want); s != "" { + t.Errorf("update %d: %s\n", i, s) + } + } + } + for len(want) > 0 { + wantevent := want[0] + want = want[1:] + t.Errorf("%d: missing event: %v", i, wantevent) + } + } +} + +func TestGetEventStatusError(t *testing.T) { + myError := errors.New("event error") + client := kfake.NewSimpleClientset() + client.PrependReactor("list", "events", func(action ktest.Action) (bool, runtime.Object, error) { + return true, nil, myError + }) + + _, err := GetEventStatus(context.Background(), client, "") + if s := errdiff.Check(err, myError); s != "" { + t.Error(s) + } +} + +type fakeWatch struct { + ch chan watch.Event + done chan struct{} +} + +func (f *fakeWatch) Stop() { + close(f.done) +} + +func (f *fakeWatch) ResultChan() <-chan watch.Event { + return f.ch +} + +func TestWatchEventStatus(t *testing.T) { + if _, _, err := WatchEventStatus(nil, nil, ""); err == nil { //nolint:all + t.Errorf("WatchEventStatus does not return an error on a nil context.") + } + if _, _, err := WatchEventStatus(context.TODO(), nil, ""); err == nil { + t.Errorf("WatchEventStatus does not return an error on a nil client.") + } + client := kfake.NewSimpleClientset() + + var wanted = []*EventStatus{ + &normal1eventstatus, + &warning1eventstatus, + &warning2eventstatus, + } + var updates = []*corev1.Event{ + normal1event, + warning1event, + warning2event, + } + for _, u := range updates { + u.CreationTimestamp = metav1.NewTime(time.Now().Add(time.Minute)) + } + client.PrependWatchReactor("*", func(action ktest.Action) (bool, watch.Interface, error) { + f := &fakeWatch{ + ch: make(chan watch.Event, 1), + done: make(chan struct{}), + } + go func() { + kind := watch.Added + for _, u := range updates { + select { + case f.ch <- watch.Event{ + Type: kind, + Object: u, + }: + kind = watch.Modified + case <-f.done: + return + } + } + }() + return true, f, nil + }) + + ch, stop, err := WatchEventStatus(context.Background(), client, "") + if err != nil { + t.Fatal(err) + } + defer stop() + for i, want := range wanted { + got, ok := <-ch + if !ok { + t.Fatalf("channel closed early") + } + if !got.Equal(want) { + t.Fatalf("#%d\ngot : %v\nwant: %v", i, got, want) + } + } +} + +func TestWatchEventStatusError(t *testing.T) { + client := kfake.NewSimpleClientset() + myError := errors.New("watch error") + client.PrependWatchReactor("*", func(action ktest.Action) (bool, watch.Interface, error) { + f := &fakeWatch{ + ch: make(chan watch.Event, 1), + done: make(chan struct{}), + } + return true, f, myError + }) + + _, _, err := WatchEventStatus(context.Background(), client, "") + if s := errdiff.Check(err, myError); s != "" { + t.Error(s) + } +} + +func TestString(t *testing.T) { + for _, tt := range []struct { + name string + event *EventStatus + status string + }{ + {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"}`}, + {"event2", &normal1eventstatus, normal1eventstring}, + {"event3", &warning1eventstatus, warning1eventstring}, + {"event4", &warning2eventstatus, warning2eventstring}, + } { + t.Run(tt.name, func(t *testing.T) { + status := tt.event.String() + if status != tt.status { + t.Errorf("Got/Want:\n%s\n%s", status, tt.status) + } + }) + } +} + +func TestEqual(t *testing.T) { + different := &EventStatus{} + for i, event := range []*EventStatus{&normal1eventstatus, &warning1eventstatus, &warning2eventstatus} { + lintEvent := event + if !event.Equal(lintEvent) { + t.Errorf("#%d: Equal returned false on equal events", i) + } + if event.Equal(different) { + t.Errorf("#%d: Equal returned true on different events", i) + } + } +} diff --git a/events/status.go b/events/status.go new file mode 100644 index 00000000..2df34829 --- /dev/null +++ b/events/status.go @@ -0,0 +1,136 @@ +package events + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + log "k8s.io/klog/v2" +) + +// A Watcher watches event updates. +type Watcher struct { + ctx context.Context + errCh chan error + wstop func() + cancel func() + eventStates map[types.UID]string + ch chan *EventStatus + stdout io.Writer + warningf func(string, ...any) + + mu sync.Mutex + progress bool +} + +var errorMsgs = [2]string{"Insufficient memory", "Insufficient cpu"} + +// NewWatcher returns a Watcher on the provided client or an error. The cancel +// function is called when the Watcher determines an event has permanently +// failed. The Watcher will exit if the context provided is canceled, an error +// is encountered, or Cleanup is called. +func NewWatcher(ctx context.Context, client kubernetes.Interface, cancel func()) (*Watcher, error) { + ch, stop, err := WatchEventStatus(ctx, client, "") + if err != nil { + return nil, err + } + w := newWatcher(ctx, cancel, ch, stop) + go w.watch() + return w, nil +} + +func newWatcher(ctx context.Context, cancel func(), ch chan *EventStatus, stop func()) *Watcher { + w := &Watcher{ + ctx: ctx, + ch: ch, + wstop: stop, + cancel: cancel, + stdout: os.Stdout, + eventStates: map[types.UID]string{}, + warningf: log.Warningf, + } + // A channel is used to record errors from the watcher to prevent any + // possible race conditions if Cleanup is called while an update is + // happening. At most one error will be written to the channel. + w.errCh = make(chan error, 1) + w.display("Displaying state changes for events") + return w +} + +// SetProgress determins if progress output should be displayed while watching. +func (w *Watcher) SetProgress(value bool) { + w.mu.Lock() + w.progress = value + w.mu.Unlock() +} + +func (w *Watcher) stop() { + w.mu.Lock() + stop := w.wstop + w.wstop = nil + w.mu.Unlock() + if stop != nil { + stop() + } +} + +// Cleanup should be called when the Watcher is no longer needed. If the +// Watcher encountered an error the provided err is logged and the Watcher error +// is returned, otherwise err is returned. +func (w *Watcher) Cleanup(err error) error { + w.stop() + select { + case werr := <-w.errCh: + if err != nil { + w.warningf("Event Watcher failed: %v", err) + } + w.warningf("Event Watcher error: %v", werr) + return werr + default: + } + return err +} + +func (w *Watcher) watch() { + defer w.stop() + for { + select { + case s, ok := <-w.ch: + if !ok || !w.isEventNormal(s) { + return + } + case <-w.ctx.Done(): + return + } + } +} + +var timeNow = func() string { return time.Now().Format("15:04:05 ") } + +func (w *Watcher) display(format string, v ...any) { + if w.progress { + fmt.Fprintf(w.stdout, timeNow()+format+"\n", v...) + } +} + +func (w *Watcher) isEventNormal(s *EventStatus) bool { + w.display("NS: %s Event name: %s Type: %s Message: %s", s.Namespace, s.Name, s.Type, s.Message) + + message := s.Message + for _, m := range errorMsgs { + // Error out if message contains predefined message + if strings.Contains(message, m) { + w.errCh <- fmt.Errorf("Event failed due to %s . Message: %s", s.Event.Reason, message) + w.cancel() + return false + } + } + + return true +} diff --git a/events/status_test.go b/events/status_test.go new file mode 100644 index 00000000..89c3f743 --- /dev/null +++ b/events/status_test.go @@ -0,0 +1,311 @@ +package events + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "testing" + + "github.com/h-fam/errdiff" + "k8s.io/apimachinery/pkg/types" + kfake "k8s.io/client-go/kubernetes/fake" +) + +func init() { + _ = timeNow() + timeNow = func() string { return "01:23:45 " } +} + +func TestDisplay(t *testing.T) { + var buf strings.Builder + var w Watcher + w.stdout = &buf + w.SetProgress(false) + w.display("hello %s", "world") + if got := buf.String(); got != "" { + t.Errorf("display w/o progress got %q, want \"\"", got) + } + buf.Reset() + w.SetProgress(true) + w.display("hello %s", "world") + want := "01:23:45 hello world\n" + got := buf.String() + if got != want { + t.Errorf("display got %q, want %q", got, want) + } +} + +func TestStop(t *testing.T) { + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + newWatcher(context.TODO(), cancel, nil, stop).stop() + if stopped != true { + t.Errorf("got stopped %v, want %v", stopped, true) + } + if canceled != false { + t.Errorf("got canceled %v, want %v", canceled, false) + } +} + +func TestCleanup(t *testing.T) { + var ( + error1 = errors.New("First Error") + error2 = errors.New("Second Error") + ) + for _, tt := range []struct { + name string + err error + werr error + want error + canceled bool + output string + }{ + { + name: "no_errors", + }, + { + name: "passed_error", + err: error1, + want: error1, + }, + { + name: "generated_error", + werr: error2, + want: error2, + output: "Event Watcher error: Second Error\n", + }, + { + name: "passed_and_generated_error", + err: error1, + werr: error2, + want: error2, + output: "Event Watcher failed: First Error\nEvent Watcher error: Second Error\n", + }, + } { + t.Run(tt.name, func(t *testing.T) { + var buf strings.Builder + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + w := newWatcher(context.TODO(), cancel, nil, stop) + w.stdout = &buf + if tt.werr != nil { + w.errCh <- tt.werr + } + w.warningf = func(f string, v ...any) { + fmt.Fprintf(&buf, f+"\n", v...) + } + got := w.Cleanup(tt.err) + if s := errdiff.Check(got, tt.want); s != "" { + t.Errorf("%s", s) + } + if stopped != true { + t.Errorf("got stopped %v, want %v", stopped, true) + } + if canceled != false { + t.Errorf("got canceled %v, want %v", canceled, false) + } + output := buf.String() + if output != tt.output { + t.Errorf("Got output %q, want %q", output, tt.output) + } + }) + } +} + +func TestWatcher(t *testing.T) { + normalEvent := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "Created container kube-rbac-proxy", Type: EventNormal} + insufficientCPU := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", Type: EventWarning} + insufficientMem := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", Type: EventWarning} + for _, tt := range []struct { + name string + s *EventStatus + output string + closed bool + canceled bool + }{ + { + name: "no_updates", + closed: true, + }, + { + name: "all", + s: normalEvent, + closed: true, + output: ` +01:23:45 NS: ns Event name: event_name Type: Normal Message: Created container kube-rbac-proxy +`[1:], + }, + { + name: "failed_insufficient_cpu", + s: insufficientCPU, + closed: true, + output: ` +01:23:45 NS: ns Event name: event_name Type: Warning Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + }, + { + name: "failed_insufficient_memory", + s: insufficientMem, + closed: true, + output: ` +01:23:45 NS: ns Event name: event_name Type: Warning Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + }, + { + name: "canceled", + canceled: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + stopped := false + stop := func() { stopped = true } + ch := make(chan *EventStatus, 2) + var buf strings.Builder + w := newWatcher(ctx, cancel, ch, stop) + w.progress = true + w.stdout = &buf + if tt.s != nil { + ch <- tt.s + } + if tt.closed { + close(ch) + } + if tt.canceled { + cancel() + } + w.watch() + if !stopped { + t.Errorf("Watcher did not stop") + } + if output := buf.String(); output != tt.output { + t.Errorf("Got output %q, want %q", output, tt.output) + } + }) + } +} + +func TestNewWatcher(t *testing.T) { + if _, err := NewWatcher(context.TODO(), nil, func() {}); err == nil { + t.Errorf("NewWatcher did not return an error on bad input") + } + client := kfake.NewSimpleClientset() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + w, err := NewWatcher(ctx, client, func() {}) + if err != nil { + t.Errorf("NewWatcher failed: %v", err) + } + if w.ctx != ctx { + t.Errorf("Watcher has the wrong context") + } + if w.ch == nil { + t.Errorf("Watcher has no channel") + } + if w.wstop == nil { + t.Errorf("Watcher has no stop") + } + if w.cancel == nil { + t.Errorf("Watcher has no cancel") + } + if w.stdout != os.Stdout { + t.Errorf("Watcher's stdout is not os.Stdout") + } + if w.warningf == nil { + t.Errorf("Watcher's warningf is nil") + } + if w.eventStates == nil { + t.Errorf("Watcher did not make eventStates") + } +} + +func TestIsEventNormal(t *testing.T) { + var buf strings.Builder + + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + + w := newWatcher(context.TODO(), cancel, nil, stop) + w.stdout = &buf + w.SetProgress(true) + + var seen string + + const ( + uid1 = types.UID("uid1") + uid2 = types.UID("uid2") + uid3 = types.UID("uid3") + ) + + for _, tt := range []struct { + name string + event *EventStatus + want string + errch string + stopped bool + canceled bool + }{ + { + name: "normal", + event: &EventStatus{Name: "event1", UID: uid1, Namespace: "ns1", Type: EventNormal, Message: "normal event"}, + want: ` +01:23:45 NS: ns1 Event name: event1 Type: Normal Message: normal event +`[1:], + }, + { + name: "insufficient_cpu", + event: &EventStatus{Name: "event2", UID: uid1, Namespace: "ns1", Type: EventWarning, Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.."}, + want: ` +01:23:45 NS: ns1 Event name: event2 Type: Warning Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + errch: "Event failed due to . Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + canceled: true, + }, + { + name: "insufficient_memory", + event: &EventStatus{Name: "event3", UID: uid1, Namespace: "ns1", Type: EventWarning, Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.."}, + want: ` +01:23:45 NS: ns1 Event name: event3 Type: Warning Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + errch: "Event failed due to . Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + canceled: true, + }, + } { + w.isEventNormal(tt.event) + t.Run(tt.name, func(t *testing.T) { + got := buf.String() + if !strings.HasPrefix(got, seen) { + t.Fatalf("got %q, wanted prefix %q", got, seen) + } + seen, got = got, got[len(seen):] + if got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + var errch string + select { + case err := <-w.errCh: + errch = err.Error() + default: + } + if errch != tt.errch { + t.Errorf("got error %s, want error %s", errch, tt.errch) + } + if stopped != tt.stopped { + t.Errorf("got stopped %v, want %v", stopped, tt.stopped) + } + stopped = false + if canceled != tt.canceled { + t.Errorf("got canceled %v, want %v", canceled, tt.canceled) + } + canceled = false + }) + } +} diff --git a/topo/topo.go b/topo/topo.go index 4c98cd19..11e39a08 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -26,6 +26,7 @@ import ( "github.com/kr/pretty" topologyclientv1 "github.com/networkop/meshnet-cni/api/clientset/v1beta1" topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1" + "github.com/openconfig/kne/events" "github.com/openconfig/kne/metrics" "github.com/openconfig/kne/pods" cpb "github.com/openconfig/kne/proto/controller" @@ -232,9 +233,6 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error finish := m.reportCreateEvent(ctx) defer func() { finish(rerr) }() } - if err := m.push(ctx); err != nil { - return err - } ctx, cancel := context.WithCancel(ctx) // Watch the containter status of the pods so we can fail if a container fails to start running. if w, err := pods.NewWatcher(ctx, m.kClient, cancel); err != nil { @@ -246,6 +244,18 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error rerr = w.Cleanup(rerr) }() } + if w, err := events.NewWatcher(ctx, m.kClient, cancel); err != nil { + log.Warningf("Failed to start event watcher: %v", err) + } else { + w.SetProgress(m.progress) + defer func() { + cancel() + rerr = w.Cleanup(rerr) + }() + } + if err := m.push(ctx); err != nil { + return err + } if err := m.checkNodeStatus(ctx, timeout); err != nil { return err }