From f9ab7b8afe6c1491361616aafefc3599f786c109 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Thu, 13 Jul 2023 15:42:56 +0000 Subject: [PATCH 01/11] Initial check-in event watcher --- events/events.go | 114 +++++++++++++++++++++++++++++++++++++++++++++ events/status.go | 119 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 events/events.go create mode 100644 events/status.go diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..abadbead --- /dev/null +++ b/events/events.go @@ -0,0 +1,114 @@ +// Package events provides events status for a namespace using kubectl. +package events + +import ( + "context" + "errors" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// A EventStatus represents the status of a single Event. +type EventStatus struct { + Name string // name of event + UID types.UID + Namespace string + Event corev1.Event // copy of the raw event +} + +func (p *EventStatus) String() string { + var buf strings.Builder + add := func(k, v string) { + if v != "" { + fmt.Fprintf(&buf, ", %s: %q", k, v) + } + } + fmt.Fprintf(&buf, "{Name: %q", p.Name) + add("UID", string(p.UID)) + add("Namespace", p.Namespace) + return buf.String() +} + +func (p *EventStatus) Equal(q *EventStatus) bool { + if p.UID != q.UID || + p.Name != q.Name || + p.Namespace != q.Namespace { + return false + } + return true +} + +// Values for EventType. These constants are copied from k8s.io/api/core/v1 as a +// convenience. +const ( + EventNormal = corev1.EventTypeNormal + EventWarning = corev1.EventTypeWarning +) + +// GetEventStatus returns the status of the events found in the supplied namespace. +func GetEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) ([]*EventStatus, error) { + events, err := client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + var statuses []*EventStatus + for _, event := range events.Items { + event := event + statuses = append(statuses, EventToStatus(&event)) + } + return statuses, nil +} + +// WatchEventStatus returns a channel on which the status of the events in the +// supplied namespace are written. +func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) (chan *EventStatus, func(), error) { + if ctx == nil { + return nil, nil, errors.New("WatchEventStatus: nil context ") + } + if client == nil { + return nil, nil, errors.New("WatchEventStatus: nil client ") + } + w, err := client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, err + } + kch := w.ResultChan() + ch := make(chan *EventStatus, 2) + + go func() { + defer close(ch) + // seen is used to drop duplicate updates + seen := map[types.UID]*EventStatus{} + for event := range kch { + switch e := event.Object.(type) { + case *corev1.Event: + s := EventToStatus(e) + if os, ok := seen[s.UID]; ok { + if s.Equal(os) { + continue + } + } + seen[s.UID] = s + ch <- s + } + } + }() + return ch, w.Stop, nil +} + +// EventToStatus returns a pointer to a new EventStatus for an event. +func EventToStatus(event *corev1.Event) *EventStatus { + s := EventStatus{ + Name: event.ObjectMeta.Name, + Namespace: event.ObjectMeta.Namespace, + UID: event.ObjectMeta.UID, + } + event.DeepCopyInto(&s.Event) + event = &s.Event + return &s +} diff --git a/events/status.go b/events/status.go new file mode 100644 index 00000000..4ea0759b --- /dev/null +++ b/events/status.go @@ -0,0 +1,119 @@ +package events + +import ( + "context" + "fmt" + "io" + "os" + "sync" + "time" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + log "k8s.io/klog/v2" +) + +// A Watcher watches event updates. +type Watcher struct { + ctx context.Context + errCh chan error + wstop func() + cancel func() + eventStates map[types.UID]string + ch chan *EventStatus + stdout io.Writer + warningf func(string, ...any) + + mu sync.Mutex + progress bool + currentNamespace string + currentEvent types.UID +} + +// NewWatcher returns a Watcher on the provided client or an error. The cancel +// function is called when the Watcher determines an event has permanently +// failed. The Watcher will exit if the context provided is canceled, an error +// is encountered, or Cleanup is called. +func NewWatcher(ctx context.Context, client kubernetes.Interface, cancel func()) (*Watcher, error) { + ch, stop, err := WatchEventStatus(ctx, client, "") + if err != nil { + return nil, err + } + w := newWatcher(ctx, cancel, ch, stop) + go w.watch() + return w, nil +} + +func newWatcher(ctx context.Context, cancel func(), ch chan *EventStatus, stop func()) *Watcher { + w := &Watcher{ + ctx: ctx, + ch: ch, + wstop: stop, + cancel: cancel, + stdout: os.Stdout, + eventStates: map[types.UID]string{}, + warningf: log.Warningf, + } + // A channel is used to record errors from the watcher to prevent any + // possible race conditions if Cleanup is called while an update is + // happening. At most one error will be written to the channel. + w.errCh = make(chan error, 1) + w.display("Displaying state changes for events") + return w +} + +// SetProgress determins if progress output should be displayed while watching. +func (w *Watcher) SetProgress(value bool) { + w.mu.Lock() + w.progress = value + w.mu.Unlock() +} + +func (w *Watcher) stop() { + w.mu.Lock() + stop := w.wstop + w.wstop = nil + w.mu.Unlock() + if stop != nil { + stop() + } +} + +// Cleanup should be called when the Watcher is no longer needed. If the +// Watcher encountered an error the provided err is logged and the Watcher error +// is returned, otherwise err is returned. +func (w *Watcher) Cleanup(err error) error { + w.stop() + select { + case werr := <-w.errCh: + if err != nil { + w.warningf("Deploy() failed: %v", err) + } + w.warningf("Deployment failed: %v", werr) + return werr + default: + } + return err +} + +func (w *Watcher) watch() { + defer w.stop() + for { + select { + case s, ok := <-w.ch: + if !ok || !w.updateEvent(s) { + return + } + case <-w.ctx.Done(): + return + } + } +} + +var timeNow = func() string { return time.Now().Format("15:04:05 ") } + +func (w *Watcher) display(format string, v ...any) { + if w.progress { + fmt.Fprintf(w.stdout, timeNow()+format+"\n", v...) + } +} From 879e35dca50fec2b18e66a0e32cd9c0522a69808 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Fri, 14 Jul 2023 21:50:17 +0000 Subject: [PATCH 02/11] Check for insufficient memory, cpu conditions and error out if necessary --- events/events.go | 16 ++++++++-------- events/status.go | 23 ++++++++++++++++++++++- topo/topo.go | 10 ++++++++++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/events/events.go b/events/events.go index abadbead..5876ccfa 100644 --- a/events/events.go +++ b/events/events.go @@ -21,23 +21,23 @@ type EventStatus struct { Event corev1.Event // copy of the raw event } -func (p *EventStatus) String() string { +func (e *EventStatus) String() string { var buf strings.Builder add := func(k, v string) { if v != "" { fmt.Fprintf(&buf, ", %s: %q", k, v) } } - fmt.Fprintf(&buf, "{Name: %q", p.Name) - add("UID", string(p.UID)) - add("Namespace", p.Namespace) + fmt.Fprintf(&buf, "{Name: %q", e.Name) + add("UID", string(e.UID)) + add("Namespace", e.Namespace) return buf.String() } -func (p *EventStatus) Equal(q *EventStatus) bool { - if p.UID != q.UID || - p.Name != q.Name || - p.Namespace != q.Namespace { +func (e *EventStatus) Equal(q *EventStatus) bool { + if e.UID != q.UID || + e.Name != q.Name || + e.Namespace != q.Namespace { return false } return true diff --git a/events/status.go b/events/status.go index 4ea0759b..46dd01ef 100644 --- a/events/status.go +++ b/events/status.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "strings" "sync" "time" @@ -30,6 +31,8 @@ type Watcher struct { currentEvent types.UID } +var errorMsgs = [2]string{"Insufficient memory", "Insufficient cpu"} + // NewWatcher returns a Watcher on the provided client or an error. The cancel // function is called when the Watcher determines an event has permanently // failed. The Watcher will exit if the context provided is canceled, an error @@ -101,7 +104,7 @@ func (w *Watcher) watch() { for { select { case s, ok := <-w.ch: - if !ok || !w.updateEvent(s) { + if !ok || !w.displayEvent(s) { return } case <-w.ctx.Done(): @@ -117,3 +120,21 @@ func (w *Watcher) display(format string, v ...any) { fmt.Fprintf(w.stdout, timeNow()+format+"\n", v...) } } + +func (w *Watcher) displayEvent(s *EventStatus) bool { + if w.progress { + log.Info(timeNow() + s.Event.String() + "\n") + } + + message := s.Event.Message + for _, m := range errorMsgs { + // Error out if namespace is currentnamesapce and message contains predefinedcheck namespace + if w.currentNamespace == s.Namespace && strings.Contains(message, m) { + w.errCh <- fmt.Errorf("Event failed due to %s . Message: %s", s.Event.Reason, message) + w.cancel() + return false + } + } + + return true +} diff --git a/topo/topo.go b/topo/topo.go index 4c98cd19..2e0fe3b7 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -26,6 +26,7 @@ import ( "github.com/kr/pretty" topologyclientv1 "github.com/networkop/meshnet-cni/api/clientset/v1beta1" topologyv1 "github.com/networkop/meshnet-cni/api/types/v1beta1" + "github.com/openconfig/kne/events" "github.com/openconfig/kne/metrics" "github.com/openconfig/kne/pods" cpb "github.com/openconfig/kne/proto/controller" @@ -246,6 +247,15 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error rerr = w.Cleanup(rerr) }() } + if w, err := events.NewWatcher(ctx, m.kClient, cancel); err != nil { + log.Warningf("Failed to start event watcher: %v", err) + } else { + w.SetProgress(m.progress) + defer func() { + cancel() + rerr = w.Cleanup(rerr) + }() + } if err := m.checkNodeStatus(ctx, timeout); err != nil { return err } From ac408717a69b6142516328bc586edd28d06a90f4 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Mon, 17 Jul 2023 19:54:56 +0000 Subject: [PATCH 03/11] Add message , type in EventStatus struct --- events/events.go | 4 ++++ events/status.go | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/events/events.go b/events/events.go index 5876ccfa..876740a7 100644 --- a/events/events.go +++ b/events/events.go @@ -18,6 +18,8 @@ type EventStatus struct { Name string // name of event UID types.UID Namespace string + Message string + Type string Event corev1.Event // copy of the raw event } @@ -110,5 +112,7 @@ func EventToStatus(event *corev1.Event) *EventStatus { } event.DeepCopyInto(&s.Event) event = &s.Event + s.Type = event.DeepCopy().Type + s.Message = event.DeepCopy().Message return &s } diff --git a/events/status.go b/events/status.go index 46dd01ef..edf04a17 100644 --- a/events/status.go +++ b/events/status.go @@ -122,11 +122,12 @@ func (w *Watcher) display(format string, v ...any) { } func (w *Watcher) displayEvent(s *EventStatus) bool { - if w.progress { - log.Info(timeNow() + s.Event.String() + "\n") - } + w.display("NS: %s", s.Namespace) + w.display("Event: %s", s.Name) + w.display("EventType: %s", s.Type) + w.display("Event message: %s", s.Message) - message := s.Event.Message + message := s.Message for _, m := range errorMsgs { // Error out if namespace is currentnamesapce and message contains predefinedcheck namespace if w.currentNamespace == s.Namespace && strings.Contains(message, m) { From 0d3d48549537a8b8f3944dad05837a752611dea8 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Mon, 17 Jul 2023 20:05:04 +0000 Subject: [PATCH 04/11] Add unit tests for status.go --- events/status_test.go | 234 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 events/status_test.go diff --git a/events/status_test.go b/events/status_test.go new file mode 100644 index 00000000..8cf88d17 --- /dev/null +++ b/events/status_test.go @@ -0,0 +1,234 @@ +package events + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "testing" + + "github.com/h-fam/errdiff" + kfake "k8s.io/client-go/kubernetes/fake" +) + +func init() { + _ = timeNow() + timeNow = func() string { return "01:23:45 " } +} + +func TestDisplay(t *testing.T) { + var buf strings.Builder + var w Watcher + w.stdout = &buf + w.SetProgress(false) + w.display("hello %s", "world") + if got := buf.String(); got != "" { + t.Errorf("display w/o progress got %q, want \"\"", got) + } + buf.Reset() + w.SetProgress(true) + w.display("hello %s", "world") + want := "01:23:45 hello world\n" + got := buf.String() + if got != want { + t.Errorf("display got %q, want %q", got, want) + } +} + +func TestStop(t *testing.T) { + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + newWatcher(context.TODO(), cancel, nil, stop).stop() + if stopped != true { + t.Errorf("got stopped %v, want %v", stopped, true) + } + if canceled != false { + t.Errorf("got canceled %v, want %v", canceled, false) + } +} + +func TestCleanup(t *testing.T) { + var ( + error1 = errors.New("First Error") + error2 = errors.New("Second Error") + ) + for _, tt := range []struct { + name string + err error + werr error + want error + canceled bool + output string + }{ + { + name: "no_errors", + }, + { + name: "passed_error", + err: error1, + want: error1, + }, + { + name: "generated_error", + werr: error2, + want: error2, + output: "Deployment failed: Second Error\n", + }, + { + name: "passed_and_generated_error", + err: error1, + werr: error2, + want: error2, + output: "Deploy() failed: First Error\nDeployment failed: Second Error\n", + }, + } { + t.Run(tt.name, func(t *testing.T) { + var buf strings.Builder + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + w := newWatcher(context.TODO(), cancel, nil, stop) + w.stdout = &buf + if tt.werr != nil { + w.errCh <- tt.werr + } + w.warningf = func(f string, v ...any) { + fmt.Fprintf(&buf, f+"\n", v...) + } + got := w.Cleanup(tt.err) + if s := errdiff.Check(got, tt.want); s != "" { + t.Errorf("%s", s) + } + if stopped != true { + t.Errorf("got stopped %v, want %v", stopped, true) + } + if canceled != false { + t.Errorf("got canceled %v, want %v", canceled, false) + } + output := buf.String() + if output != tt.output { + t.Errorf("Got output %q, want %q", output, tt.output) + } + }) + } +} + +func TestWatcher(t *testing.T) { + normalEvent := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "Created container kube-rbac-proxy", Type: EventNormal} + insufficientCPU := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", Type: EventWarning} + insufficientMem := &EventStatus{Name: "event_name", UID: "uid", Namespace: "ns", Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", Type: EventWarning} + for _, tt := range []struct { + name string + s *EventStatus + output string + closed bool + canceled bool + }{ + { + name: "no_updates", + closed: true, + }, + { + name: "all", + s: normalEvent, + closed: true, + output: ` +01:23:45 NS: ns +01:23:45 Event: event_name +01:23:45 EventType: Normal +01:23:45 Event message: Created container kube-rbac-proxy +`[1:], + }, + { + name: "failed", + s: insufficientCPU, + closed: true, + output: ` +01:23:45 NS: ns +01:23:45 Event: event_name +01:23:45 EventType: Warning +01:23:45 Event message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + }, + { + name: "failed", + s: insufficientMem, + closed: true, + output: ` +01:23:45 NS: ns +01:23:45 Event: event_name +01:23:45 EventType: Warning +01:23:45 Event message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + }, + { + name: "canceled", + canceled: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + stopped := false + stop := func() { stopped = true } + ch := make(chan *EventStatus, 2) + var buf strings.Builder + w := newWatcher(ctx, cancel, ch, stop) + w.progress = true + w.stdout = &buf + if tt.s != nil { + ch <- tt.s + } + if tt.closed { + close(ch) + } + if tt.canceled { + cancel() + } + w.watch() + if !stopped { + t.Errorf("Watcher did not stop") + } + if output := buf.String(); output != tt.output { + t.Errorf("Got output %q, want %q", output, tt.output) + } + }) + } +} + +func TestNewWatcher(t *testing.T) { + if _, err := NewWatcher(context.TODO(), nil, func() {}); err == nil { + t.Errorf("NewWatcher did not return an error on bad input") + } + client := kfake.NewSimpleClientset() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + w, err := NewWatcher(ctx, client, func() {}) + if err != nil { + t.Errorf("NewWatcher failed: %v", err) + } + if w.ctx != ctx { + t.Errorf("Watcher has the wrong context") + } + if w.ch == nil { + t.Errorf("Watcher has no channel") + } + if w.wstop == nil { + t.Errorf("Watcher has no stop") + } + if w.cancel == nil { + t.Errorf("Watcher has no cancel") + } + if w.stdout != os.Stdout { + t.Errorf("Watcher's stdout is not os.Stdout") + } + if w.warningf == nil { + t.Errorf("Watcher's warningf is nil") + } + if w.eventStates == nil { + t.Errorf("Watcher did not make eventStates") + } +} From 2c58a3a3c0f2d610edcc91dd902e7ce22ab9a57f Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Wed, 19 Jul 2023 16:59:37 +0000 Subject: [PATCH 05/11] adding more unit tests --- events/data_test.go | 205 +++++++++++++++++++++++++++++++++++++++++ events/events_test.go | 206 ++++++++++++++++++++++++++++++++++++++++++ events/status.go | 6 ++ 3 files changed, 417 insertions(+) create mode 100644 events/data_test.go create mode 100644 events/events_test.go diff --git a/events/data_test.go b/events/data_test.go new file mode 100644 index 00000000..009c4418 --- /dev/null +++ b/events/data_test.go @@ -0,0 +1,205 @@ +package events + +import ( + "encoding/json" + + corev1 "k8s.io/api/core/v1" +) + +// Variables are named +// +// name#data Raw json from kubectl +// name#event Data converted into a corev1.Event +// name#status PodStatus version +// name#string PodStatus as a string + +func json2event(j string) *corev1.Event { + var event corev1.Event + if err := json.Unmarshal(([]byte)(j), &event); err != nil { + panic(err) + } + return &event +} + +var ( + normal1event = json2event(normal1eventdata) + warning1event = json2event(warning1eventdata) + warning2event = json2event(warning2eventdata) +) + +// We only use 3 events when checking EventStatus.String as these cover all the cases. +var ( + normal1eventstring = `{Name: "service-r2.1772fc5571f425ff", UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", Namespace: "ceos2"}` + warning1eventstring = `{Name: "r1.177351c307a38f38", UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", Namespace: "ceos1"}` +) + +var ( + normal1eventdata = ` + { + "metadata": { + "name": "service-r2.1772fc5571f425ff", + "namespace": "ceos2", + "uid": "a756a650-bf61-4da6-8d69-d5ae40bd943c", + "resourceVersion": "3243", + "creationTimestamp": "2023-07-18T14:24:14Z", + "managedFields": [ + { + "manager": "speaker", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-18T14:24:14Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Service", + "namespace": "ceos2", + "name": "service-r2", + "uid": "2459a3c5-c353-4a50-aa70-dc897ffb051e", + "apiVersion": "v1", + "resourceVersion": "3076" + }, + "reason": "nodeAssigned", + "message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + "source": { + "component": "metallb-speaker" + }, + "firstTimestamp": "2023-07-18T14:24:14Z", + "lastTimestamp": "2023-07-18T14:24:14Z", + "count": 1, + "type": "Normal", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + ` + + normal1eventstatus = EventStatus{ + "Name": "service-r2.1772fc5571f425ff", + "UID": "a756a650-bf61-4da6-8d69-d5ae40bd943c", + "Namespace": "ceos2", + "Message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + "Type": "Normal", + "Event": { + "metadata": { + "name": "service-r2.1772fc5571f425ff", + "namespace": "ceos2", + "uid": "a756a650-bf61-4da6-8d69-d5ae40bd943c", + "resourceVersion": "3243", + "creationTimestamp": "2023-07-18T14:24:14Z", + "managedFields": [ + { + "manager": "speaker", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-18T14:24:14Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Service", + "namespace": "ceos2", + "name": "service-r2", + "uid": "2459a3c5-c353-4a50-aa70-dc897ffb051e", + "apiVersion": "v1", + "resourceVersion": "3076" + }, + "reason": "nodeAssigned", + "message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + "source": { + "component": "metallb-speaker" + }, + "firstTimestamp": "2023-07-18T14:24:14Z", + "lastTimestamp": "2023-07-18T14:24:14Z", + "count": 1, + "type": "Normal", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + } + + warning1eventstatus = EventStatus{ + "Name": "r1.177351c307a38f38", + "UID": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + "Namespace": "ceos1", + "Message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "Type": "Warning", + "Event": { + "metadata": { + "name": "r1.177351c307a38f38", + "namespace": "ceos1", + "uid": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + "resourceVersion": "325871", + "creationTimestamp": "2023-07-19T16:29:43Z", + "managedFields": [ + { + "manager": "kube-scheduler", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-19T16:39:50Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} + } + } + ] + }, + "involvedObject": { + "kind": "Pod", + "namespace": "ceos1", + "name": "r1", + "uid": "b69a3fd5-2f25-42aa-91c4-146c40e53053", + "apiVersion": "v1", + "resourceVersion": "323358" + }, + "reason": "FailedScheduling", + "message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "source": { + "component": "default-scheduler" + }, + "firstTimestamp": "2023-07-19T16:29:43Z", + "lastTimestamp": "2023-07-19T16:39:50Z", + "count": 3, + "type": "Warning", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + } +) \ No newline at end of file diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..1bfcdc40 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,206 @@ +package events + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/h-fam/errdiff" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + kfake "k8s.io/client-go/kubernetes/fake" + ktest "k8s.io/client-go/testing" +) + +func TestGetEventStatus(t *testing.T) { + client := kfake.NewSimpleClientset() + var eventUpdates = [][]*corev1.Event{ + {normal1event}, + {warning1event}, + {warning2event}, + } + + var eventGetUpdatesStatus = [][]EventStatus{ + {normal1eventstatus}, + } + + next := -1 + + client.PrependReactor("list", "events", func(action ktest.Action) (bool, runtime.Object, error) { + next++ + if next >= len(eventUpdates) { + return false, nil, nil + } + el := corev1.EventList{} + for _, e := range eventUpdates[next] { + el.Items = append(el.Items, *e) + } + return true, &el, nil + }) + + for i := range eventUpdates { + got, err := GetEventStatus(context.Background(), client, "") + if err != nil { + t.Fatal(err) + } + want := eventGetUpdatesStatus[i] + j := -1 + for len(got) > 0 { + j++ + gotevent := got[0] + got = got[1:] + if len(want) == 0 { + t.Errorf("%d-%d: extra event: %v", i, j, got) + continue + } + wantevent := want[0] + want = want[1:] + if !gotevent.Equal(&wantevent) { + t.Errorf("%d-%d FAIL:\ngot : %s\nwant: %s", i, j, gotevent.String(), wantevent.String()) + } else if false { + t.Logf("%d-%d okay:\ngot : %s\nwant: %s", i, j, gotevent.String(), wantevent.String()) + } + + if false { + if s := cmp.Diff(got, want); s != "" { + t.Errorf("update %d: %s\n", i, s) + } + } + } + for len(want) > 0 { + wantevent := want[0] + want = want[1:] + t.Errorf("%d: missing event: %v", i, wantevent) + } + } +} + +func TestGetEventStatusError(t *testing.T) { + myError := errors.New("event error") + client := kfake.NewSimpleClientset() + client.PrependReactor("list", "events", func(action ktest.Action) (bool, runtime.Object, error) { + return true, nil, myError + }) + + _, err := GetEventStatus(context.Background(), client, "") + if s := errdiff.Check(err, myError); s != "" { + t.Error(s) + } +} + +type fakeWatch struct { + ch chan watch.Event + done chan struct{} +} + +func (f *fakeWatch) Stop() { + close(f.done) +} + +func (f *fakeWatch) ResultChan() <-chan watch.Event { + return f.ch +} + +func TestWatchEventStatus(t *testing.T) { + if _, _, err := WatchEventStatus(nil, nil, ""); err == nil { //nolint:all + t.Errorf("WatchEventStatus does not return an error on a nil context.") + } + if _, _, err := WatchEventStatus(context.TODO(), nil, ""); err == nil { + t.Errorf("WatchEventStatus does not return an error on a nil client.") + } + client := kfake.NewSimpleClientset() + + var wanted = []*EventStatus{ + &normal1eventstatus, + } + var updates = []*corev1.Event{ + normal1event, + warning1event, + warning2event, + } + client.PrependWatchReactor("*", func(action ktest.Action) (bool, watch.Interface, error) { + f := &fakeWatch{ + ch: make(chan watch.Event, 1), + done: make(chan struct{}), + } + go func() { + kind := watch.Added + for _, u := range updates { + select { + case f.ch <- watch.Event{ + Type: kind, + Object: u, + }: + kind = watch.Modified + case <-f.done: + return + } + } + }() + return true, f, nil + }) + + ch, stop, err := WatchEventStatus(context.Background(), client, "") + if err != nil { + t.Fatal(err) + } + defer stop() + for i, want := range wanted { + got, ok := <-ch + if !ok { + t.Fatalf("channel closed early") + } + if !got.Equal(want) { + t.Fatalf("#%d\ngot : %v\nwant: %v", i, got, want) + } + } +} + +func TestWatchEventStatusError(t *testing.T) { + client := kfake.NewSimpleClientset() + myError := errors.New("watch error") + client.PrependWatchReactor("*", func(action ktest.Action) (bool, watch.Interface, error) { + f := &fakeWatch{ + ch: make(chan watch.Event, 1), + done: make(chan struct{}), + } + return true, f, myError + }) + + _, _, err := WatchEventStatus(context.Background(), client, "") + if s := errdiff.Check(err, myError); s != "" { + t.Error(s) + } +} + +func TestString(t *testing.T) { + for _, tt := range []struct { + name string + event *EventStatus + status string + }{ + {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"`}, // No containers + } { + t.Run(tt.name, func(t *testing.T) { + status := tt.event.String() + if status != tt.status { + t.Errorf("Got/Want:\n%s\n%s", status, tt.status) + } + }) + } +} + +func TestEqual(t *testing.T) { + different := &EventStatus{} + for i, event := range []*EventStatus{&normal1eventstatus} { + lintEvent := event + if !event.Equal(lintEvent) { + t.Errorf("#%d: Equal returned false on equal events", i) + } + if event.Equal(different) { + t.Errorf("#%d: Equal returned true on different events", i) + } + } +} diff --git a/events/status.go b/events/status.go index edf04a17..139ecda9 100644 --- a/events/status.go +++ b/events/status.go @@ -122,6 +122,12 @@ func (w *Watcher) display(format string, v ...any) { } func (w *Watcher) displayEvent(s *EventStatus) bool { + newNamespace := s.Namespace != w.currentNamespace + if newNamespace { + w.currentNamespace = s.Namespace + w.display("NS: %s", s.Namespace) + newNamespace = false + } w.display("NS: %s", s.Namespace) w.display("Event: %s", s.Name) w.display("EventType: %s", s.Type) From 5fc1f946b3283303c9c2182bf69d38a80860216a Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Thu, 20 Jul 2023 15:18:09 +0000 Subject: [PATCH 06/11] fix linter and test failures --- events/data_test.go | 233 ++++++++++++++++++++++-------------------- events/events_test.go | 8 +- events/status.go | 5 +- 3 files changed, 131 insertions(+), 115 deletions(-) diff --git a/events/data_test.go b/events/data_test.go index 009c4418..c8bd4e92 100644 --- a/events/data_test.go +++ b/events/data_test.go @@ -31,6 +31,7 @@ var ( var ( normal1eventstring = `{Name: "service-r2.1772fc5571f425ff", UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", Namespace: "ceos2"}` warning1eventstring = `{Name: "r1.177351c307a38f38", UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", Namespace: "ceos1"}` + warning2eventstring = `{Name: "r1.17735337a69ffd33", UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", Namespace: "ceos6"}` ) var ( @@ -88,118 +89,130 @@ var ( ` normal1eventstatus = EventStatus{ - "Name": "service-r2.1772fc5571f425ff", - "UID": "a756a650-bf61-4da6-8d69-d5ae40bd943c", - "Namespace": "ceos2", - "Message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", - "Type": "Normal", - "Event": { - "metadata": { - "name": "service-r2.1772fc5571f425ff", - "namespace": "ceos2", - "uid": "a756a650-bf61-4da6-8d69-d5ae40bd943c", - "resourceVersion": "3243", - "creationTimestamp": "2023-07-18T14:24:14Z", - "managedFields": [ - { - "manager": "speaker", - "operation": "Update", - "apiVersion": "v1", - "time": "2023-07-18T14:24:14Z", - "fieldsType": "FieldsV1", - "fieldsV1": { - "f:count": {}, - "f:firstTimestamp": {}, - "f:involvedObject": {}, - "f:lastTimestamp": {}, - "f:message": {}, - "f:reason": {}, - "f:source": { - "f:component": {} - }, - "f:type": {} - } + Name: "service-r2.1772fc5571f425ff", + UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", + Namespace: "ceos2", + Message: "announcing from node \"kne-control-plane\" with protocol \"layer2\"", + Type: "Normal", + } + + warning1eventdata = ` + { + "metadata": { + "name": "r1.177351c307a38f38", + "namespace": "ceos1", + "uid": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + "resourceVersion": "325871", + "creationTimestamp": "2023-07-19T16:29:43Z", + "managedFields": [ + { + "manager": "kube-scheduler", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-19T16:39:50Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} } - ] - }, - "involvedObject": { - "kind": "Service", - "namespace": "ceos2", - "name": "service-r2", - "uid": "2459a3c5-c353-4a50-aa70-dc897ffb051e", - "apiVersion": "v1", - "resourceVersion": "3076" - }, - "reason": "nodeAssigned", - "message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"", - "source": { - "component": "metallb-speaker" - }, - "firstTimestamp": "2023-07-18T14:24:14Z", - "lastTimestamp": "2023-07-18T14:24:14Z", - "count": 1, - "type": "Normal", - "eventTime": null, - "reportingComponent": "", - "reportingInstance": "" - } + } + ] + }, + "involvedObject": { + "kind": "Pod", + "namespace": "ceos1", + "name": "r1", + "uid": "b69a3fd5-2f25-42aa-91c4-146c40e53053", + "apiVersion": "v1", + "resourceVersion": "323358" + }, + "reason": "FailedScheduling", + "message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "source": { + "component": "default-scheduler" + }, + "firstTimestamp": "2023-07-19T16:29:43Z", + "lastTimestamp": "2023-07-19T16:39:50Z", + "count": 3, + "type": "Warning", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" } - + ` warning1eventstatus = EventStatus{ - "Name": "r1.177351c307a38f38", - "UID": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", - "Namespace": "ceos1", - "Message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", - "Type": "Warning", - "Event": { - "metadata": { - "name": "r1.177351c307a38f38", - "namespace": "ceos1", - "uid": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", - "resourceVersion": "325871", - "creationTimestamp": "2023-07-19T16:29:43Z", - "managedFields": [ - { - "manager": "kube-scheduler", - "operation": "Update", - "apiVersion": "v1", - "time": "2023-07-19T16:39:50Z", - "fieldsType": "FieldsV1", - "fieldsV1": { - "f:count": {}, - "f:firstTimestamp": {}, - "f:involvedObject": {}, - "f:lastTimestamp": {}, - "f:message": {}, - "f:reason": {}, - "f:source": { - "f:component": {} - }, - "f:type": {} - } + Name: "r1.177351c307a38f38", + UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", + Namespace: "ceos1", + Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + Type: "Warning", + } + + warning2eventdata = ` + { + "metadata": { + "name": "r1.17735337a69ffd33", + "namespace": "ceos6", + "uid": "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", + "resourceVersion": "1621", + "creationTimestamp": "2023-07-19T16:56:23Z", + "managedFields": [ + { + "manager": "kube-scheduler", + "operation": "Update", + "apiVersion": "v1", + "time": "2023-07-19T16:56:23Z", + "fieldsType": "FieldsV1", + "fieldsV1": { + "f:count": {}, + "f:firstTimestamp": {}, + "f:involvedObject": {}, + "f:lastTimestamp": {}, + "f:message": {}, + "f:reason": {}, + "f:source": { + "f:component": {} + }, + "f:type": {} } - ] - }, - "involvedObject": { - "kind": "Pod", - "namespace": "ceos1", - "name": "r1", - "uid": "b69a3fd5-2f25-42aa-91c4-146c40e53053", - "apiVersion": "v1", - "resourceVersion": "323358" - }, - "reason": "FailedScheduling", - "message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", - "source": { - "component": "default-scheduler" - }, - "firstTimestamp": "2023-07-19T16:29:43Z", - "lastTimestamp": "2023-07-19T16:39:50Z", - "count": 3, - "type": "Warning", - "eventTime": null, - "reportingComponent": "", - "reportingInstance": "" - } + } + ] + }, + "involvedObject": { + "kind": "Pod", + "namespace": "ceos6", + "name": "r1", + "uid": "140da1fb-0ff4-4b9d-b512-41c64bd9b1e1", + "apiVersion": "v1", + "resourceVersion": "1620" + }, + "reason": "FailedScheduling", + "message": "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + "source": { + "component": "default-scheduler" + }, + "firstTimestamp": "2023-07-19T16:56:23Z", + "lastTimestamp": "2023-07-19T16:56:23Z", + "count": 1, + "type": "Warning", + "eventTime": null, + "reportingComponent": "", + "reportingInstance": "" + } + ` + warning2eventstatus = EventStatus{ + Name: "r1.17735337a69ffd33", + UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", + Namespace: "ceos6", + Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + Type: "Warning", } -) \ No newline at end of file +) diff --git a/events/events_test.go b/events/events_test.go index 1bfcdc40..8e4c97c2 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -24,6 +24,8 @@ func TestGetEventStatus(t *testing.T) { var eventGetUpdatesStatus = [][]EventStatus{ {normal1eventstatus}, + {warning1eventstatus}, + {warning2eventstatus}, } next := -1 @@ -114,6 +116,8 @@ func TestWatchEventStatus(t *testing.T) { var wanted = []*EventStatus{ &normal1eventstatus, + &warning1eventstatus, + &warning2eventstatus, } var updates = []*corev1.Event{ normal1event, @@ -181,7 +185,7 @@ func TestString(t *testing.T) { event *EventStatus status string }{ - {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"`}, // No containers + {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"`}, } { t.Run(tt.name, func(t *testing.T) { status := tt.event.String() @@ -194,7 +198,7 @@ func TestString(t *testing.T) { func TestEqual(t *testing.T) { different := &EventStatus{} - for i, event := range []*EventStatus{&normal1eventstatus} { + for i, event := range []*EventStatus{&normal1eventstatus, &warning1eventstatus, &warning2eventstatus} { lintEvent := event if !event.Equal(lintEvent) { t.Errorf("#%d: Equal returned false on equal events", i) diff --git a/events/status.go b/events/status.go index 139ecda9..394a8d17 100644 --- a/events/status.go +++ b/events/status.go @@ -125,7 +125,6 @@ func (w *Watcher) displayEvent(s *EventStatus) bool { newNamespace := s.Namespace != w.currentNamespace if newNamespace { w.currentNamespace = s.Namespace - w.display("NS: %s", s.Namespace) newNamespace = false } w.display("NS: %s", s.Namespace) @@ -135,8 +134,8 @@ func (w *Watcher) displayEvent(s *EventStatus) bool { message := s.Message for _, m := range errorMsgs { - // Error out if namespace is currentnamesapce and message contains predefinedcheck namespace - if w.currentNamespace == s.Namespace && strings.Contains(message, m) { + // Error out if message contains predefined message + if strings.Contains(message, m) { w.errCh <- fmt.Errorf("Event failed due to %s . Message: %s", s.Event.Reason, message) w.cancel() return false From 9927c148fb8224d0ec89a4cf3206b7a41feb6703 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Thu, 20 Jul 2023 22:30:30 +0000 Subject: [PATCH 07/11] Fix case to handling event watching for multiple topology create commands --- events/events.go | 13 ++++--------- events/events_test.go | 5 +++++ events/status.go | 6 +++--- events/status_test.go | 6 +++--- topo/topo.go | 6 +++--- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/events/events.go b/events/events.go index 876740a7..7b64cd2b 100644 --- a/events/events.go +++ b/events/events.go @@ -81,22 +81,17 @@ func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespac } kch := w.ResultChan() ch := make(chan *EventStatus, 2) + initialTimestamp := metav1.Now() go func() { defer close(ch) - // seen is used to drop duplicate updates - seen := map[types.UID]*EventStatus{} for event := range kch { switch e := event.Object.(type) { case *corev1.Event: - s := EventToStatus(e) - if os, ok := seen[s.UID]; ok { - if s.Equal(os) { - continue - } + if !e.CreationTimestamp.Before(&initialTimestamp) { + s := EventToStatus(e) + ch <- s } - seen[s.UID] = s - ch <- s } } }() diff --git a/events/events_test.go b/events/events_test.go index 8e4c97c2..c993642a 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -4,10 +4,12 @@ import ( "context" "errors" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/h-fam/errdiff" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" kfake "k8s.io/client-go/kubernetes/fake" @@ -124,6 +126,9 @@ func TestWatchEventStatus(t *testing.T) { warning1event, warning2event, } + for _, u := range updates { + u.CreationTimestamp = metav1.NewTime(time.Now().Add(time.Minute)) + } client.PrependWatchReactor("*", func(action ktest.Action) (bool, watch.Interface, error) { f := &fakeWatch{ ch: make(chan watch.Event, 1), diff --git a/events/status.go b/events/status.go index 394a8d17..442b251b 100644 --- a/events/status.go +++ b/events/status.go @@ -90,9 +90,9 @@ func (w *Watcher) Cleanup(err error) error { select { case werr := <-w.errCh: if err != nil { - w.warningf("Deploy() failed: %v", err) + w.warningf("Create() failed: %v", err) } - w.warningf("Deployment failed: %v", werr) + w.warningf("Topology creation failed failed: %v", werr) return werr default: } @@ -128,7 +128,7 @@ func (w *Watcher) displayEvent(s *EventStatus) bool { newNamespace = false } w.display("NS: %s", s.Namespace) - w.display("Event: %s", s.Name) + w.display("Event name: %s", s.Name) w.display("EventType: %s", s.Type) w.display("Event message: %s", s.Message) diff --git a/events/status_test.go b/events/status_test.go index 8cf88d17..f4efcf7c 100644 --- a/events/status_test.go +++ b/events/status_test.go @@ -138,7 +138,7 @@ func TestWatcher(t *testing.T) { closed: true, output: ` 01:23:45 NS: ns -01:23:45 Event: event_name +01:23:45 Event name: event_name 01:23:45 EventType: Normal 01:23:45 Event message: Created container kube-rbac-proxy `[1:], @@ -149,7 +149,7 @@ func TestWatcher(t *testing.T) { closed: true, output: ` 01:23:45 NS: ns -01:23:45 Event: event_name +01:23:45 Event name: event_name 01:23:45 EventType: Warning 01:23:45 Event message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. `[1:], @@ -160,7 +160,7 @@ func TestWatcher(t *testing.T) { closed: true, output: ` 01:23:45 NS: ns -01:23:45 Event: event_name +01:23:45 Event name: event_name 01:23:45 EventType: Warning 01:23:45 Event message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. `[1:], diff --git a/topo/topo.go b/topo/topo.go index 2e0fe3b7..11e39a08 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -233,9 +233,6 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error finish := m.reportCreateEvent(ctx) defer func() { finish(rerr) }() } - if err := m.push(ctx); err != nil { - return err - } ctx, cancel := context.WithCancel(ctx) // Watch the containter status of the pods so we can fail if a container fails to start running. if w, err := pods.NewWatcher(ctx, m.kClient, cancel); err != nil { @@ -256,6 +253,9 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error rerr = w.Cleanup(rerr) }() } + if err := m.push(ctx); err != nil { + return err + } if err := m.checkNodeStatus(ctx, timeout); err != nil { return err } From 85fd162e85e07fa0e0af52032b26df05cf2a1bc4 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Thu, 20 Jul 2023 23:24:50 +0000 Subject: [PATCH 08/11] Linter error handling, additional unit tests --- events/events.go | 1 + events/events_test.go | 5 +- events/status.go | 17 ++----- events/status_test.go | 109 +++++++++++++++++++++++++++++++++++------- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/events/events.go b/events/events.go index 7b64cd2b..ad68849e 100644 --- a/events/events.go +++ b/events/events.go @@ -33,6 +33,7 @@ func (e *EventStatus) String() string { fmt.Fprintf(&buf, "{Name: %q", e.Name) add("UID", string(e.UID)) add("Namespace", e.Namespace) + fmt.Fprint(&buf, "}") return buf.String() } diff --git a/events/events_test.go b/events/events_test.go index c993642a..a0cf928b 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -190,7 +190,10 @@ func TestString(t *testing.T) { event *EventStatus status string }{ - {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"`}, + {"event1", &EventStatus{Name: "event1", Namespace: "ns", UID: "event-1"}, `{Name: "event1", UID: "event-1", Namespace: "ns"}`}, + {"event2", &normal1eventstatus, normal1eventstring}, + {"event3", &warning1eventstatus, warning1eventstring}, + {"event4", &warning2eventstatus, warning2eventstring}, } { t.Run(tt.name, func(t *testing.T) { status := tt.event.String() diff --git a/events/status.go b/events/status.go index 442b251b..0632d759 100644 --- a/events/status.go +++ b/events/status.go @@ -28,7 +28,6 @@ type Watcher struct { mu sync.Mutex progress bool currentNamespace string - currentEvent types.UID } var errorMsgs = [2]string{"Insufficient memory", "Insufficient cpu"} @@ -92,7 +91,7 @@ func (w *Watcher) Cleanup(err error) error { if err != nil { w.warningf("Create() failed: %v", err) } - w.warningf("Topology creation failed failed: %v", werr) + w.warningf("Topology creation failed: %v", werr) return werr default: } @@ -104,7 +103,7 @@ func (w *Watcher) watch() { for { select { case s, ok := <-w.ch: - if !ok || !w.displayEvent(s) { + if !ok || !w.isEventNormal(s) { return } case <-w.ctx.Done(): @@ -121,16 +120,8 @@ func (w *Watcher) display(format string, v ...any) { } } -func (w *Watcher) displayEvent(s *EventStatus) bool { - newNamespace := s.Namespace != w.currentNamespace - if newNamespace { - w.currentNamespace = s.Namespace - newNamespace = false - } - w.display("NS: %s", s.Namespace) - w.display("Event name: %s", s.Name) - w.display("EventType: %s", s.Type) - w.display("Event message: %s", s.Message) +func (w *Watcher) isEventNormal(s *EventStatus) bool { + w.display("NS: %s Event name: %s Type: %s Message: %s", s.Namespace, s.Name, s.Type, s.Message) message := s.Message for _, m := range errorMsgs { diff --git a/events/status_test.go b/events/status_test.go index f4efcf7c..50151500 100644 --- a/events/status_test.go +++ b/events/status_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/h-fam/errdiff" + "k8s.io/apimachinery/pkg/types" kfake "k8s.io/client-go/kubernetes/fake" ) @@ -75,14 +76,14 @@ func TestCleanup(t *testing.T) { name: "generated_error", werr: error2, want: error2, - output: "Deployment failed: Second Error\n", + output: "Topology creation failed: Second Error\n", }, { name: "passed_and_generated_error", err: error1, werr: error2, want: error2, - output: "Deploy() failed: First Error\nDeployment failed: Second Error\n", + output: "Create() failed: First Error\nTopology creation failed: Second Error\n", }, } { t.Run(tt.name, func(t *testing.T) { @@ -137,32 +138,23 @@ func TestWatcher(t *testing.T) { s: normalEvent, closed: true, output: ` -01:23:45 NS: ns -01:23:45 Event name: event_name -01:23:45 EventType: Normal -01:23:45 Event message: Created container kube-rbac-proxy +01:23:45 NS: ns Event name: event_name Type: Normal Message: Created container kube-rbac-proxy `[1:], }, { - name: "failed", + name: "failed_insufficient_cpu", s: insufficientCPU, closed: true, output: ` -01:23:45 NS: ns -01:23:45 Event name: event_name -01:23:45 EventType: Warning -01:23:45 Event message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +01:23:45 NS: ns Event name: event_name Type: Warning Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. `[1:], }, { - name: "failed", + name: "failed_insufficient_memory", s: insufficientMem, closed: true, output: ` -01:23:45 NS: ns -01:23:45 Event name: event_name -01:23:45 EventType: Warning -01:23:45 Event message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +01:23:45 NS: ns Event name: event_name Type: Warning Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. `[1:], }, { @@ -232,3 +224,88 @@ func TestNewWatcher(t *testing.T) { t.Errorf("Watcher did not make eventStates") } } + +func TestIsEventNormal(t *testing.T) { + var buf strings.Builder + + canceled := false + cancel := func() { canceled = true } + stopped := false + stop := func() { stopped = true } + + w := newWatcher(context.TODO(), cancel, nil, stop) + w.stdout = &buf + w.SetProgress(true) + + var seen string + + const ( + uid1 = types.UID("uid1") + uid2 = types.UID("uid2") + uid3 = types.UID("uid3") + ) + + for _, tt := range []struct { + name string + event *EventStatus + want string + errch string + stopped bool + canceled bool + }{ + { + name: "normal", + event: &EventStatus{Name: "event1", UID: uid1, Namespace: "ns1", Type: EventNormal, Message: "normal event"}, + want: ` +01:23:45 NS: ns1 Event name: event1 Type: Normal Message: normal event +`[1:], + }, + { + name: "insufficient_cpu", + event: &EventStatus{Name: "event2", UID: uid1, Namespace: "ns1", Type: EventWarning, Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.."}, + want: ` +01:23:45 NS: ns1 Event name: event2 Type: Warning Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + errch: "Event failed due to . Message: 0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + canceled: true, + }, + { + name: "insufficient_memory", + event: &EventStatus{Name: "event3", UID: uid1, Namespace: "ns1", Type: EventWarning, Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.."}, + want: ` +01:23:45 NS: ns1 Event name: event3 Type: Warning Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod.. +`[1:], + errch: "Event failed due to . Message: 0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..", + canceled: true, + }, + } { + w.isEventNormal(tt.event) + t.Run(tt.name, func(t *testing.T) { + got := buf.String() + if !strings.HasPrefix(got, seen) { + t.Fatalf("got %q, wanted prefix %q", got, seen) + } + seen, got = got, got[len(seen):] + if got != tt.want { + t.Errorf("got %q, want %q", got, tt.want) + } + var errch string + select { + case err := <-w.errCh: + errch = err.Error() + default: + } + if errch != tt.errch { + t.Errorf("got error %s, want error %s", errch, tt.errch) + } + if stopped != tt.stopped { + t.Errorf("got stopped %v, want %v", stopped, tt.stopped) + } + stopped = false + if canceled != tt.canceled { + t.Errorf("got canceled %v, want %v", canceled, tt.canceled) + } + canceled = false + }) + } +} From 60e638677a5532a2789c69a4a62bde05b247c8e9 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Tue, 25 Jul 2023 13:21:19 +0000 Subject: [PATCH 09/11] Incorporate review comments --- events/events.go | 6 +++--- events/status.go | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/events/events.go b/events/events.go index ad68849e..15ee0cec 100644 --- a/events/events.go +++ b/events/events.go @@ -1,4 +1,4 @@ -// Package events provides events status for a namespace using kubectl. +// Package events provides events status for a namespace. package events import ( @@ -108,7 +108,7 @@ func EventToStatus(event *corev1.Event) *EventStatus { } event.DeepCopyInto(&s.Event) event = &s.Event - s.Type = event.DeepCopy().Type - s.Message = event.DeepCopy().Message + s.Type = event.Type + s.Message = event.Message return &s } diff --git a/events/status.go b/events/status.go index 0632d759..2df34829 100644 --- a/events/status.go +++ b/events/status.go @@ -25,9 +25,8 @@ type Watcher struct { stdout io.Writer warningf func(string, ...any) - mu sync.Mutex - progress bool - currentNamespace string + mu sync.Mutex + progress bool } var errorMsgs = [2]string{"Insufficient memory", "Insufficient cpu"} @@ -89,9 +88,9 @@ func (w *Watcher) Cleanup(err error) error { select { case werr := <-w.errCh: if err != nil { - w.warningf("Create() failed: %v", err) + w.warningf("Event Watcher failed: %v", err) } - w.warningf("Topology creation failed: %v", werr) + w.warningf("Event Watcher error: %v", werr) return werr default: } From 40ed311fbe85f5659d060f7f821b92c402e40025 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Tue, 25 Jul 2023 13:28:21 +0000 Subject: [PATCH 10/11] Fix unit test failure --- events/status_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/events/status_test.go b/events/status_test.go index 50151500..89c3f743 100644 --- a/events/status_test.go +++ b/events/status_test.go @@ -76,14 +76,14 @@ func TestCleanup(t *testing.T) { name: "generated_error", werr: error2, want: error2, - output: "Topology creation failed: Second Error\n", + output: "Event Watcher error: Second Error\n", }, { name: "passed_and_generated_error", err: error1, werr: error2, want: error2, - output: "Create() failed: First Error\nTopology creation failed: Second Error\n", + output: "Event Watcher failed: First Error\nEvent Watcher error: Second Error\n", }, } { t.Run(tt.name, func(t *testing.T) { From 62fc0affb2f7c4464fe163f5b4d31228f036e214 Mon Sep 17 00:00:00 2001 From: Neha Manjunath Date: Tue, 25 Jul 2023 15:54:19 +0000 Subject: [PATCH 11/11] fix formatting issue --- events/data_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/events/data_test.go b/events/data_test.go index c8bd4e92..ad2a65a0 100644 --- a/events/data_test.go +++ b/events/data_test.go @@ -9,9 +9,9 @@ import ( // Variables are named // // name#data Raw json from kubectl -// name#event Data converted into a corev1.Event -// name#status PodStatus version -// name#string PodStatus as a string +// name#event Data converted into a corev1.Event +// name#status EventStatus version +// name#string EventStatus as a string func json2event(j string) *corev1.Event { var event corev1.Event