Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event watcher for topo create #399

Merged
merged 11 commits into from
Jul 25, 2023
218 changes: 218 additions & 0 deletions events/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package events

import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
)

// Variables are named
//
// name#data Raw json from kubectl
// name#event Data converted into a corev1.Event
// name#status EventStatus version
// name#string EventStatus as a string

func json2event(j string) *corev1.Event {
var event corev1.Event
if err := json.Unmarshal(([]byte)(j), &event); err != nil {
panic(err)
}
return &event
}

var (
normal1event = json2event(normal1eventdata)
warning1event = json2event(warning1eventdata)
warning2event = json2event(warning2eventdata)
)

// We only use 3 events when checking EventStatus.String as these cover all the cases.
var (
normal1eventstring = `{Name: "service-r2.1772fc5571f425ff", UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c", Namespace: "ceos2"}`
warning1eventstring = `{Name: "r1.177351c307a38f38", UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35", Namespace: "ceos1"}`
warning2eventstring = `{Name: "r1.17735337a69ffd33", UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269", Namespace: "ceos6"}`
)

var (
normal1eventdata = `
{
"metadata": {
"name": "service-r2.1772fc5571f425ff",
"namespace": "ceos2",
"uid": "a756a650-bf61-4da6-8d69-d5ae40bd943c",
"resourceVersion": "3243",
"creationTimestamp": "2023-07-18T14:24:14Z",
"managedFields": [
{
"manager": "speaker",
"operation": "Update",
"apiVersion": "v1",
"time": "2023-07-18T14:24:14Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:count": {},
"f:firstTimestamp": {},
"f:involvedObject": {},
"f:lastTimestamp": {},
"f:message": {},
"f:reason": {},
"f:source": {
"f:component": {}
},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Service",
"namespace": "ceos2",
"name": "service-r2",
"uid": "2459a3c5-c353-4a50-aa70-dc897ffb051e",
"apiVersion": "v1",
"resourceVersion": "3076"
},
"reason": "nodeAssigned",
"message": "announcing from node \"kne-control-plane\" with protocol \"layer2\"",
"source": {
"component": "metallb-speaker"
},
"firstTimestamp": "2023-07-18T14:24:14Z",
"lastTimestamp": "2023-07-18T14:24:14Z",
"count": 1,
"type": "Normal",
"eventTime": null,
"reportingComponent": "",
"reportingInstance": ""
}
`

normal1eventstatus = EventStatus{
Name: "service-r2.1772fc5571f425ff",
UID: "a756a650-bf61-4da6-8d69-d5ae40bd943c",
Namespace: "ceos2",
Message: "announcing from node \"kne-control-plane\" with protocol \"layer2\"",
Type: "Normal",
}

warning1eventdata = `
{
"metadata": {
"name": "r1.177351c307a38f38",
"namespace": "ceos1",
"uid": "a0450cc2-26e5-46e6-8ef8-be9dec02cd35",
"resourceVersion": "325871",
"creationTimestamp": "2023-07-19T16:29:43Z",
"managedFields": [
{
"manager": "kube-scheduler",
"operation": "Update",
"apiVersion": "v1",
"time": "2023-07-19T16:39:50Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:count": {},
"f:firstTimestamp": {},
"f:involvedObject": {},
"f:lastTimestamp": {},
"f:message": {},
"f:reason": {},
"f:source": {
"f:component": {}
},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Pod",
"namespace": "ceos1",
"name": "r1",
"uid": "b69a3fd5-2f25-42aa-91c4-146c40e53053",
"apiVersion": "v1",
"resourceVersion": "323358"
},
"reason": "FailedScheduling",
"message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
"source": {
"component": "default-scheduler"
},
"firstTimestamp": "2023-07-19T16:29:43Z",
"lastTimestamp": "2023-07-19T16:39:50Z",
"count": 3,
"type": "Warning",
"eventTime": null,
"reportingComponent": "",
"reportingInstance": ""
}
`
warning1eventstatus = EventStatus{
Name: "r1.177351c307a38f38",
UID: "a0450cc2-26e5-46e6-8ef8-be9dec02cd35",
Namespace: "ceos1",
Message: "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
Type: "Warning",
}

warning2eventdata = `
{
"metadata": {
"name": "r1.17735337a69ffd33",
"namespace": "ceos6",
"uid": "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269",
"resourceVersion": "1621",
"creationTimestamp": "2023-07-19T16:56:23Z",
"managedFields": [
{
"manager": "kube-scheduler",
"operation": "Update",
"apiVersion": "v1",
"time": "2023-07-19T16:56:23Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:count": {},
"f:firstTimestamp": {},
"f:involvedObject": {},
"f:lastTimestamp": {},
"f:message": {},
"f:reason": {},
"f:source": {
"f:component": {}
},
"f:type": {}
}
}
]
},
"involvedObject": {
"kind": "Pod",
"namespace": "ceos6",
"name": "r1",
"uid": "140da1fb-0ff4-4b9d-b512-41c64bd9b1e1",
"apiVersion": "v1",
"resourceVersion": "1620"
},
"reason": "FailedScheduling",
"message": "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
"source": {
"component": "default-scheduler"
},
"firstTimestamp": "2023-07-19T16:56:23Z",
"lastTimestamp": "2023-07-19T16:56:23Z",
"count": 1,
"type": "Warning",
"eventTime": null,
"reportingComponent": "",
"reportingInstance": ""
}
`
warning2eventstatus = EventStatus{
Name: "r1.17735337a69ffd33",
UID: "efa6a664-ccf3-4ba8-a30b-09e0ebeaa269",
Namespace: "ceos6",
Message: "0/1 nodes are available: 1 Insufficient memory. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
Type: "Warning",
}
)
114 changes: 114 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Package events provides events status for a namespace.
package events

import (
"context"
"errors"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)

// A EventStatus represents the status of a single Event.
type EventStatus struct {
Name string // name of event
UID types.UID
Namespace string
Message string
Type string
Event corev1.Event // copy of the raw event
}

func (e *EventStatus) String() string {
var buf strings.Builder
add := func(k, v string) {
if v != "" {
fmt.Fprintf(&buf, ", %s: %q", k, v)
}
}
fmt.Fprintf(&buf, "{Name: %q", e.Name)
add("UID", string(e.UID))
add("Namespace", e.Namespace)
fmt.Fprint(&buf, "}")
return buf.String()
}

func (e *EventStatus) Equal(q *EventStatus) bool {
if e.UID != q.UID ||
e.Name != q.Name ||
e.Namespace != q.Namespace {
return false
}
return true
}

// Values for EventType. These constants are copied from k8s.io/api/core/v1 as a
// convenience.
const (
EventNormal = corev1.EventTypeNormal
EventWarning = corev1.EventTypeWarning
)

// GetEventStatus returns the status of the events found in the supplied namespace.
func GetEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) ([]*EventStatus, error) {
events, err := client.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
var statuses []*EventStatus
for _, event := range events.Items {
event := event
statuses = append(statuses, EventToStatus(&event))
}
return statuses, nil
}

// WatchEventStatus returns a channel on which the status of the events in the
// supplied namespace are written.
func WatchEventStatus(ctx context.Context, client kubernetes.Interface, namespace string) (chan *EventStatus, func(), error) {
if ctx == nil {
return nil, nil, errors.New("WatchEventStatus: nil context ")
}
if client == nil {
return nil, nil, errors.New("WatchEventStatus: nil client ")
}
w, err := client.CoreV1().Events(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
return nil, nil, err
}
kch := w.ResultChan()
ch := make(chan *EventStatus, 2)
initialTimestamp := metav1.Now()
alexmasi marked this conversation as resolved.
Show resolved Hide resolved

go func() {
defer close(ch)
for event := range kch {
switch e := event.Object.(type) {
case *corev1.Event:
if !e.CreationTimestamp.Before(&initialTimestamp) {
s := EventToStatus(e)
ch <- s
}
}
}
}()
return ch, w.Stop, nil
}

// EventToStatus returns a pointer to a new EventStatus for an event.
func EventToStatus(event *corev1.Event) *EventStatus {
s := EventStatus{
Name: event.ObjectMeta.Name,
Namespace: event.ObjectMeta.Namespace,
UID: event.ObjectMeta.UID,
}
event.DeepCopyInto(&s.Event)
event = &s.Event
s.Type = event.Type
s.Message = event.Message
return &s
}
Loading
Loading