Skip to content

Commit

Permalink
feat(cloudevents-server): append tail logs for failed steps (#132)
Browse files Browse the repository at this point in the history
Signed-off-by: wuhuizuo <[email protected]>

---------

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Apr 16, 2024
1 parent 4961d13 commit e41b124
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 99 deletions.
5 changes: 3 additions & 2 deletions cloudevents-server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func (c *TektonNotification) IsMatched(typ, subject string) bool {
}

type Tekton struct {
DashboardBaseURL string `yaml:"dashboard_base_url,omitempty" json:"dashboard_base_url,omitempty"`
Notifications []TektonNotification `yaml:"notifications,omitempty" json:"notifications,omitempty"`
DashboardBaseURL string `yaml:"dashboard_base_url,omitempty" json:"dashboard_base_url,omitempty"`
Notifications []TektonNotification `yaml:"notifications,omitempty" json:"notifications,omitempty"`
FailedStepTailLines int `yaml:"failed_step_tail_lines,omitempty" json:"failed_step_tail_lines,omitempty"`
}

type Config struct {
Expand Down
58 changes: 45 additions & 13 deletions cloudevents-server/pkg/events/custom/tekton/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package tekton
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
larksdk "github.com/larksuite/oapi-sdk-go/v3"
"github.com/rs/zerolog/log"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
Expand Down Expand Up @@ -62,10 +66,10 @@ func getReceivers(event cloudevents.Event, cfgs []config.TektonNotification) []s
}

// <dashboard base url>/#/namespaces/<namespace>/<run-type>s/<run-name>
// source: /apis///namespaces/<namespace>//<run-name>
// source: /apis/namespaces/<namespace>/<run-name>
// https://tekton.abc.com/tekton/apis/tekton.dev/v1beta1/namespaces/ee-cd/pipelineruns/auto-compose-multi-arch-image-run-g5hqv
//
// "source": "/apis///namespaces/ee-cd//build-package-tikv-tikv-linux-9bn55-build-binaries",
// "source": "/apis/namespaces/ee-cd/build-package-tikv-tikv-linux-9bn55-build-binaries",
func newDetailURL(etype, source, baseURL string) string {
words := strings.Split(source, "/")
runName := words[len(words)-1]
Expand All @@ -79,7 +83,7 @@ func newDetailURL(etype, source, baseURL string) string {
return fmt.Sprintf("%s/#/namespaces/%s/%s/%s", baseURL, runNamespace, runType, runName)
}

func extractLarkInfosFromEvent(event cloudevents.Event, baseURL string) (*cardMessageInfos, error) {
func extractLarkInfosFromEvent(event cloudevents.Event, baseURL string, tailLogLines int) (*cardMessageInfos, error) {
var data tektoncloudevent.TektonCloudEventData
if err := event.DataAs(&data); err != nil {
return nil, err
Expand All @@ -95,12 +99,21 @@ func extractLarkInfosFromEvent(event cloudevents.Event, baseURL string) (*cardMe
case data.PipelineRun != nil:
fillInfosWithPipelineRun(data.PipelineRun, &ret)
if event.Type() == string(tektoncloudevent.PipelineRunFailedEventV1) {
ret.FailedTasks = getFailedTasks(data.PipelineRun)
namespace := data.PipelineRun.Namespace
ret.FailedTasks = getFailedTasks(data.PipelineRun, func(podName, containerName string) string {
logs, _ := getStepLog(baseURL, namespace, podName, containerName, tailLogLines)
return logs
})
}
case data.TaskRun != nil:
fillInfosWithTaskRun(data.TaskRun, &ret)
if event.Type() == string(tektoncloudevent.TaskRunFailedEventV1) {
ret.StepStatuses = getStepStatuses(&data.TaskRun.Status)
namespace := data.TaskRun.Namespace
ret.StepStatuses = getStepStatuses(&data.TaskRun.Status, func(podName, containerName string) string {
logs, _ := getStepLog(baseURL, namespace, podName, containerName, tailLogLines)
log.Debug().Msg(logs)
return logs
})
}
case data.Run != nil:
fillInfosWithCustomRun(data.Run, &ret)
Expand Down Expand Up @@ -143,8 +156,6 @@ func fillInfosWithTaskRun(data *v1beta1.TaskRun, ret *cardMessageInfos) {
ret.Results = append(ret.Results, [2]string{r.Name, string(v)})
}
}

getStepStatuses(&data.Status)
}

func fillInfosWithPipelineRun(data *v1beta1.PipelineRun, ret *cardMessageInfos) {
Expand All @@ -166,32 +177,53 @@ func fillInfosWithPipelineRun(data *v1beta1.PipelineRun, ret *cardMessageInfos)

}

func getFailedTasks(data *v1beta1.PipelineRun) map[string][][2]string {
ret := make(map[string][][2]string)
func getFailedTasks(data *v1beta1.PipelineRun, logGetter func(podName, containerName string) string) map[string][]stepInfo {
ret := make(map[string][]stepInfo)
for _, v := range data.Status.TaskRuns {
succeededCondition := v.Status.GetCondition(apis.ConditionSucceeded)
if !succeededCondition.IsTrue() {
ret[v.PipelineTaskName] = getStepStatuses(v.Status)
ret[v.PipelineTaskName] = getStepStatuses(v.Status, logGetter)
}
}

return ret
}

func getStepStatuses(status *v1beta1.TaskRunStatus) [][2]string {
var ret [][2]string
func getStepStatuses(status *v1beta1.TaskRunStatus, logGetter func(podName, containerName string) string) []stepInfo {
var ret []stepInfo
for _, s := range status.Steps {
if s.Terminated != nil {
ret = append(ret, [2]string{s.Name, s.Terminated.Reason})
if s.Terminated.Reason != "Completed" {
ret = append(ret, stepInfo{s, logGetter(status.PodName, s.ContainerName)})
break
}
ret = append(ret, stepInfo{s, ""})
}
}

return ret
}

func getStepLog(baseURL, ns, podName, containerName string, tailLines int) (string, error) {
apiURL, err := url.JoinPath(baseURL, fmt.Sprintf("api/v1/namespaces/%s/pods/%s/log", ns, podName))
if err != nil {
return "", err
}

resp, err := http.Get(fmt.Sprintf("%s?container=%s&tailLines=%d", apiURL, containerName, tailLines))
if err != nil {
return "", err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

return string(body), nil
}

func fillTimeFileds(ret *cardMessageInfos, startTime, endTime *metav1.Time) {
if startTime != nil {
ret.StartTime = startTime.Format(time.RFC3339)
Expand Down
14 changes: 11 additions & 3 deletions cloudevents-server/pkg/events/custom/tekton/handler_pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"net/http"
"strings"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
cloudevents "github.com/cloudevents/sdk-go/v2"
lark "github.com/larksuite/oapi-sdk-go/v3"
"github.com/rs/zerolog/log"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
)

const (
Expand Down Expand Up @@ -51,13 +52,20 @@ func (h *pipelineRunHandler) Handle(event cloudevents.Event) cloudevents.Result
} else {
receivers = getReceivers(event, h.Notifications)
}
if len(receivers) == 0 {
return cloudevents.ResultACK
}

infos, err := extractLarkInfosFromEvent(event, h.DashboardBaseURL, h.FailedStepTailLines)
if err != nil {
return cloudevents.ResultNACK
}

log.Debug().
Str("ce-type", event.Type()).
Str("receivers", strings.Join(receivers, ",")).
Msg("send notification for the event type.")

return sendLarkMessages(h.LarkClient, receivers, event, h.DashboardBaseURL)
return composeAndSendLarkMessages(h.LarkClient, receivers, infos)
default:
log.Debug().
Str("handler", "pipelineRunHandler").
Expand Down
14 changes: 11 additions & 3 deletions cloudevents-server/pkg/events/custom/tekton/handler_taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"net/http"
"strings"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
cloudevents "github.com/cloudevents/sdk-go/v2"
lark "github.com/larksuite/oapi-sdk-go/v3"
"github.com/rs/zerolog/log"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
)

type taskRunHandler struct {
Expand Down Expand Up @@ -47,13 +48,20 @@ func (h *taskRunHandler) Handle(event cloudevents.Event) cloudevents.Result {
} else {
receivers = getReceivers(event, h.Notifications)
}
if len(receivers) == 0 {
return cloudevents.ResultACK
}

infos, err := extractLarkInfosFromEvent(event, h.DashboardBaseURL, h.FailedStepTailLines)
if err != nil {
return err
}

log.Debug().
Str("ce-type", event.Type()).
Str("receivers", strings.Join(receivers, ",")).
Msg("send notification for the event type.")

return sendLarkMessages(h.LarkClient, receivers, event, h.DashboardBaseURL)
return composeAndSendLarkMessages(h.LarkClient, receivers, infos)
}

log.Debug().
Expand Down
90 changes: 49 additions & 41 deletions cloudevents-server/pkg/events/custom/tekton/lark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package tekton
import (
"bytes"
"context"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strings"
Expand All @@ -16,7 +19,6 @@ import (
lark "github.com/larksuite/oapi-sdk-go/v3"
larkim "github.com/larksuite/oapi-sdk-go/v3/service/im/v1"
"github.com/rs/zerolog/log"
tektoncloudevent "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"gopkg.in/yaml.v3"

_ "embed"
Expand Down Expand Up @@ -53,53 +55,44 @@ func getReceiverIDType(id string) string {
}
}

func newMessageReq(receiver string, messageRawStr string) *larkim.CreateMessageReq {
return larkim.NewCreateMessageReqBuilder().
ReceiveIdType(getReceiverIDType(receiver)).
Body(
larkim.NewCreateMessageReqBodyBuilder().
MsgType(larkim.MsgTypeInteractive).
ReceiveId(receiver).
Content(messageRawStr).
Build(),
).
Build()
}

func sendLarkMessages(client *lark.Client, receivers []string, event cloudevents.Event, detailBaseUrl string) protocol.Result {
if len(receivers) == 0 {
return cloudevents.ResultACK
}

createMsgReqs, err := newLarkMessages(receivers, event, detailBaseUrl)
func composeAndSendLarkMessages(client *lark.Client, receivers []string, infos *cardMessageInfos) protocol.Result {
createMsgReqs, err := composeLarkMessages(receivers, infos)
if err != nil {
log.Error().Err(err).Msg("compose lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "compose lark message failed: %v", err)
}

for _, createMsgReq := range createMsgReqs {
resp, err := client.Im.Message.Create(context.Background(), createMsgReq)
if err != nil {
log.Error().Err(err).Msg("send lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err)
failedAck := sendLarkMessage(client, createMsgReq)
if failedAck != nil {
return failedAck
}
}

if !resp.Success() {
log.Error().Msg(string(resp.RawBody))
return cloudevents.ResultNACK
}
return cloudevents.ResultACK
}

log.Info().
Str("request-id", resp.RequestId()).
Str("message-id", *resp.Data.MessageId).
Msg("send lark message successfully.")
func sendLarkMessage(client *lark.Client, createMsgReq *larkim.CreateMessageReq) error {
resp, err := client.Im.Message.Create(context.Background(), createMsgReq)
if err != nil {
log.Error().Err(err).Msg("send lark message failed")
return cloudevents.NewHTTPResult(http.StatusInternalServerError, "send lark message failed: %v", err)
}

return cloudevents.ResultACK
if !resp.Success() {
log.Error().Msg(string(resp.RawBody))
return cloudevents.ResultNACK
}

log.Info().
Str("request-id", resp.RequestId()).
Str("message-id", *resp.Data.MessageId).
Msg("send lark message successfully.")
return nil
}

func newLarkMessages(receivers []string, event cloudevents.Event, detailBaseUrl string) ([]*larkim.CreateMessageReq, error) {
messageRawStr, err := newLarkCardWithGoTemplate(event, detailBaseUrl)
func composeLarkMessages(receivers []string, infos *cardMessageInfos) ([]*larkim.CreateMessageReq, error) {
messageRawStr, err := newLarkCardWithGoTemplate(infos)
if err != nil {
return nil, err
}
Expand All @@ -112,12 +105,19 @@ func newLarkMessages(receivers []string, event cloudevents.Event, detailBaseUrl
return reqs, nil
}

func newLarkCardWithGoTemplate(event cloudevents.Event, baseURL string) (string, error) {
infos, err := extractLarkInfosFromEvent(event, baseURL)
if err != nil {
return "", err
}
func newMessageReq(receiver string, messageRawStr string) *larkim.CreateMessageReq {
return larkim.NewCreateMessageReqBuilder().
ReceiveIdType(getReceiverIDType(receiver)).
Body(larkim.NewCreateMessageReqBodyBuilder().
MsgType(larkim.MsgTypeInteractive).
ReceiveId(receiver).
Content(messageRawStr).
Uuid(newLarkMsgSHA1Sum(receiver, messageRawStr)).
Build()).
Build()
}

func newLarkCardWithGoTemplate(infos *cardMessageInfos) (string, error) {
tmpl, err := template.New("lark").Funcs(sprig.FuncMap()).Parse(larkTemplateBytes)
if err != nil {
return "", err
Expand Down Expand Up @@ -149,5 +149,13 @@ func newLarkTitle(etype, subject string) string {
runState = typeWords[4]
}

return fmt.Sprintf("%s [%s] %s is %s ", larkCardHeaderEmojis[tektoncloudevent.TektonEventType(etype)], runType, subject, runState)
return fmt.Sprintf("%s [%s] %s is %s ", headerEmoji(etype), runType, subject, runState)
}

func newLarkMsgSHA1Sum(receiver, content string) string {
h := sha1.New()
io.WriteString(h, receiver)
io.WriteString(h, content)

return hex.EncodeToString(h.Sum(nil))
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ elements:
{{- range $t, $ss := . }}
- {{ $t }}:
{{- range $ss }}
1. {{ index . 0 }}: {{ index . 1 }}
1. {{ .Name }}: {{ .Terminated.Reason }}
{{- with .Logs }}
```log
{{ indent 10 . }}
```
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Expand All @@ -51,7 +56,12 @@ elements:
content: |-
**Steps:**
{{- range . }}
1. {{ index . 0 }}: {{ index . 1 }}
1. {{ .Name }}: {{ .Terminated.Reason }}
{{- with .Logs }}
```log
{{ indent 8 . }}
```
{{- end }}
{{- end }}
{{- end }}

Expand Down
Loading

0 comments on commit e41b124

Please sign in to comment.