Skip to content

Commit

Permalink
chore: introduce event filtering on pipeline and vertex (#1294)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Oct 31, 2023
1 parent 32416bf commit 3e39ef0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
32 changes: 20 additions & 12 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,38 +731,46 @@ func (h *handler) streamLogs(c *gin.Context, stream io.ReadCloser) {
}

// GetNamespaceEvents gets a list of events for the given namespace.
// It supports filtering by object type and object name.
// If objectType and objectName are specified in the request, only the events that match both will be returned.
// If objectType and objectName are not specified, all the events for the given name space will be returned.
// Events are sorted by timestamp in descending order.
func (h *handler) GetNamespaceEvents(c *gin.Context) {
ns := c.Param("namespace")

objType := c.DefaultQuery("objectType", "")
objName := c.DefaultQuery("objectName", "")
if (objType == "" && objName != "") || (objType != "" && objName == "") {
h.respondWithError(c, fmt.Sprintf("Failed to get a list of events: namespace %q: "+
"please either specify both objectType and objectName or not specify.", ns))
return
}
limit, _ := strconv.ParseInt(c.Query("limit"), 10, 64)
events, err := h.kubeClient.CoreV1().Events(ns).List(context.Background(), metav1.ListOptions{
var err error
var events *corev1.EventList
if events, err = h.kubeClient.CoreV1().Events(ns).List(context.Background(), metav1.ListOptions{
Limit: limit,
Continue: c.Query("continue"),
})
if err != nil {
}); err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get a list of events: namespace %q: %s", ns, err.Error()))
return
}

var (
response []K8sEventsResponse
defaultTimeObject time.Time
)

for _, event := range events.Items {
if event.LastTimestamp.Time == defaultTimeObject {
continue
}
var newEvent = NewK8sEventsResponse(event.LastTimestamp.UnixMilli(), event.Type, event.InvolvedObject.Kind, event.InvolvedObject.Name, event.Reason, event.Message)
response = append(response, newEvent)
if (objType == "" && objName == "") ||
(strings.EqualFold(event.InvolvedObject.Kind, objType) && strings.EqualFold(event.InvolvedObject.Name, objName)) {
newEvent := NewK8sEventsResponse(event.LastTimestamp.UnixMilli(), event.Type, event.InvolvedObject.Kind, event.InvolvedObject.Name, event.Reason, event.Message)
response = append(response, newEvent)
}
}

// sort the events by timestamp
// from most recent events to older events
sort.Slice(response, func(i int, j int) bool {
return response[i].TimeStamp >= response[j].TimeStamp
})

c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, response))
}

Expand Down
7 changes: 4 additions & 3 deletions server/apis/v1/response_k8s_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import "fmt"
type K8sEventsResponse struct {
TimeStamp int64 `json:"timestamp"`
Type string `json:"type"`
Object string `json:"object"`
Reason string `json:"reason"`
Message string `json:"message"`
// Object is in the format of "kind/name", e.g. "Pipeline/simple-pipeline"
Object string `json:"object"`
Reason string `json:"reason"`
Message string `json:"message"`
}

// NewK8sEventsResponse creates a new K8sEventsResponse object with the given inputs.
Expand Down

0 comments on commit 3e39ef0

Please sign in to comment.