Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(alerta-service): Add custom severities to Alerta handler #2584

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks
- [#2560](https://github.com/influxdata/kapacitor/pull/2560): enable new-style slack apps
- [#2576](https://github.com/influxdata/kapacitor/pull/2576): shared secret auth to influxdb in OSS
- [#2584](https://github.com/influxdata/kapacitor/pull/2584): Add custom severities to Alerta handler

### Bugfixes
- [#2564](https://github.com/influxdata/kapacitor/pull/2564): Fix a panic in the scraper handler when debug mode is enabled
Expand Down
17 changes: 17 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,23 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
if a.Environment != "" {
c.Environment = a.Environment
}
if len(a.RenameSeverities) != 0 {
c.RenameSeverities = a.RenameSeverities
}
if len(a.ExtraSeverities) != 0 {
c.ExtraSeverityExpressions = make([]stateful.Expression, len(a.ExtraSeverities))
c.ExtraSeverityNames = make([]string, len(a.ExtraSeverities))
c.ExtraSeverityScopePools = make([]stateful.ScopePool, len(a.ExtraSeverities))
for i, severity := range a.ExtraSeverities {
statefulExpression, expressionCompileError := stateful.NewExpression(severity.Condition.Expression)
if expressionCompileError != nil {
return nil, fmt.Errorf("Failed to compile stateful expression for Alerta extra severity %s: %s", severity.Name, expressionCompileError)
}
c.ExtraSeverityExpressions[i] = statefulExpression
c.ExtraSeverityNames[i] = severity.Name
c.ExtraSeverityScopePools[i] = stateful.NewScopePool(ast.FindReferenceVariables(severity.Condition.Expression))
}
}
if a.Group != "" {
c.Group = a.Group
}
Expand Down
25 changes: 25 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9197,11 +9197,18 @@ stream
.token('testtoken1234567')
.environment('production')
.timeout(1h)
.alerta()
.token('testtoken7654321')
.environment('production')
.timeout(1h)
.addSeverity('fatal', 1, lambda: "count" > 9.0)
.addSeverity('disaster', 0, lambda: "count" > 15.0)
.alerta()
.token('anothertesttoken')
.resource('resource: {{ index .Tags "host" }}')
.event('event: {{ .TaskName }}')
.environment('{{ index .Tags "host" }}')
.renameSeverity('crit', 'major')
.origin('override')
.group('{{ .ID }}')
.value('{{ index .Fields "count" }}')
Expand All @@ -9227,6 +9234,23 @@ stream
Event: "serverA",
Group: "host=serverA",
Environment: "production",
Severity: "critical",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Correlate: []string{"cpu"},
Timeout: 3600,
},
},
alertatest.Request{
URL: "/alert",
Authorization: "Bearer testtoken7654321",
PostData: alertatest.PostData{
Resource: "cpu",
Event: "serverA",
Group: "host=serverA",
Environment: "production",
Severity: "fatal",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Expand All @@ -9242,6 +9266,7 @@ stream
Event: "event: TestStream_Alert",
Group: "serverA",
Environment: "serverA",
Severity: "major",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "override",
Service: []string{"serviceA", "serviceB", "cpu"},
Expand Down
32 changes: 32 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -1219,6 +1220,12 @@ func (n *AlertNodeData) Alerta() *AlertaHandler {
return alerta
}

type AlertaCustomSeverity struct {
Name string `json:"name"`
Code int64 `json:"code"`
Condition *ast.LambdaNode `json:"condition"`
}

// tick:embedded:AlertNode.Alerta
type AlertaHandler struct {
*AlertNodeData `json:"-"`
Expand All @@ -1242,6 +1249,13 @@ type AlertaHandler struct {
// Defaut is set from the configuration.
Environment string `json:"environment"`

// Alerta supports many different severity levels. And it allows you to add even more.
// To benefit from this model we can use two ways:
// Rename kapacitor built-in severities to match some of alerta's severities
RenameSeverities map[string]string `tick:"RenameSeverity" json:"rename-severities"`
// Add post-processing to kapacitor alerts to fine tune severity levels
ExtraSeverities []*AlertaCustomSeverity `tick:"AddSeverity" json:"add-severities"`

// Alerta group.
// Can be a template and has access to the same data as the AlertNode.Details property.
// Default: {{ .Group }}
Expand All @@ -1268,6 +1282,24 @@ type AlertaHandler struct {
Timeout time.Duration `json:"timeout"`
}

func (a *AlertaHandler) RenameSeverity(kapacitorName string, alertaName string) *AlertaHandler {
if a.RenameSeverities == nil {
a.RenameSeverities = make(map[string]string)
}
a.RenameSeverities[kapacitorName] = alertaName
return a
}
func (a *AlertaHandler) AddSeverity(name string, code int64, condition *ast.LambdaNode) *AlertaHandler {
a.ExtraSeverities = append(a.ExtraSeverities, &AlertaCustomSeverity{
Name: name,
Code: code,
Condition: condition,
})
// Alerta severities have descending order: higher severity has lower code (1 for critical, 9 for ok)
sort.SliceStable(a.ExtraSeverities, func(i, j int) bool { return a.ExtraSeverities[i].Code < a.ExtraSeverities[j].Code })
return a
}

// List of effected services.
// If not specified defaults to the Name of the stream.
// tick:property
Expand Down
16 changes: 14 additions & 2 deletions pipeline/tick/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,20 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
Dot("token", h.Token).
Dot("resource", h.Resource).
Dot("event", h.Event).
Dot("environment", h.Environment).
Dot("group", h.Group).
Dot("environment", h.Environment)

var severitiesOrder = []string{"ok", "info", "warn", "crit"}
for _, k := range severitiesOrder {
if val, ok := h.RenameSeverities[k]; ok {
n.Dot("renameSeverity", k, val)
}
}

for _, k := range h.ExtraSeverities {
n.Dot("addSeverity", k.Name, k.Code, k.Condition)
}

n.Dot("group", h.Group).
Dot("value", h.Value).
Dot("origin", h.Origin).
Dot("services", args(h.Service)...).
Expand Down
10 changes: 10 additions & 0 deletions pipeline/tick/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,14 @@ func TestAlertAlerta(t *testing.T) {
handler.Resource = "Harbinger"
handler.Event = "Jump through Omega-4 Relay"
handler.Environment = "Collector base"
handler.RenameSeverities = make(map[string]string)
handler.RenameSeverities["info"] = "notice"
handler.ExtraSeverities = make([]*pipeline.AlertaCustomSeverity, 1)
handler.ExtraSeverities[0] = &pipeline.AlertaCustomSeverity{
Name: "major",
Code: 2,
Condition: newLambda(85),
}
handler.Group = "I brought Jack, Miranda and Tali"
handler.Value = "Save the Galaxy"
handler.Origin = "Omega"
Expand All @@ -680,6 +688,8 @@ func TestAlertAlerta(t *testing.T) {
.resource('Harbinger')
.event('Jump through Omega-4 Relay')
.environment('Collector base')
.renameSeverity('info', 'notice')
.addSeverity('major', 2, lambda: "cpu" > 85)
.group('I brought Jack, Miranda and Tali')
.value('Save the Galaxy')
.origin('Omega')
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10149,6 +10149,7 @@ func TestServer_AlertHandlers(t *testing.T) {
Event: "id",
Group: "test",
Environment: "env",
Severity: "critical",
Text: "message",
Origin: "kapacitor",
Service: []string{"alert"},
Expand Down
1 change: 1 addition & 0 deletions services/alerta/alertatest/alertatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type PostData struct {
Event string `json:"event"`
Group string `json:"group"`
Environment string `json:"environment"`
Severity string `json:"severity"`
Text string `json:"text"`
Origin string `json:"origin"`
Service []string `json:"service"`
Expand Down
125 changes: 108 additions & 17 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
khttp "github.com/influxdata/kapacitor/http"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -267,6 +269,16 @@ type HandlerConfig struct {
// Defaut is set from the configuration.
Environment string `mapstructure:"environment"`

// Renaming rules for severities
// Allows to rewrite build-in kapacitor severities (info, warn, crit) to any of Alerta multiple severities
RenameSeverities map[string]string `mapstructure:"rename-severities"`

// Expressions for custom Alerta severities
// Allows to fine tune severity levels of kapacitor
ExtraSeverityExpressions []stateful.Expression `mapstructure:"severity-expressions"`
ExtraSeverityNames []string `mapstructure:"severity-names"`
ExtraSeverityScopePools []stateful.ScopePool `mapstructure:"severity-scope-pool"`

// Alerta group.
// Can be a template and has access to the same data as the AlertNode.Details property.
// Default: {{ .Group }}
Expand Down Expand Up @@ -297,13 +309,17 @@ type handler struct {
c HandlerConfig
diag Diagnostic

resourceTmpl *text.Template
eventTmpl *text.Template
environmentTmpl *text.Template
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
correlateTmpl []*text.Template
resourceTmpl *text.Template
eventTmpl *text.Template
environmentTmpl *text.Template
renameSeverities map[string]string
severityLevels []string
severityExpressions []stateful.Expression
scopePools []stateful.ScopePool
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
correlateTmpl []*text.Template
}

func (s *Service) DefaultHandlerConfig() HandlerConfig {
Expand Down Expand Up @@ -357,16 +373,20 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
}

return &handler{
s: s,
c: c,
diag: s.diag.WithContext(ctx...),
resourceTmpl: rtmpl,
eventTmpl: evtmpl,
environmentTmpl: etmpl,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
correlateTmpl: ctmpl,
s: s,
c: c,
diag: s.diag.WithContext(ctx...),
resourceTmpl: rtmpl,
eventTmpl: evtmpl,
environmentTmpl: etmpl,
renameSeverities: c.RenameSeverities,
severityLevels: c.ExtraSeverityNames,
severityExpressions: c.ExtraSeverityExpressions,
scopePools: c.ExtraSeverityScopePools,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
correlateTmpl: ctmpl,
}, nil
}

Expand Down Expand Up @@ -468,20 +488,40 @@ func (h *handler) Handle(event alert.Event) {
}

var severity string
var severityKey string

switch event.State.Level {
case alert.OK:
severity = "ok"
severityKey = "ok"
case alert.Info:
severity = "informational"
severityKey = "info"
case alert.Warning:
severity = "warning"
severityKey = "warn"
case alert.Critical:
severity = "critical"
severityKey = "crit"
default:
severity = "indeterminate"
}

if val, ok := h.renameSeverities[severityKey]; ok {
severity = val
}

if len(h.severityLevels) != 0 {
for i, expression := range h.severityExpressions {
if pass, err := EvalPredicate(expression, h.scopePools[i], event.State.Time, event.Data.Fields, event.Data.Tags); err != nil {
h.diag.Error("error evaluating expression for Alerta severity", err)
} else if pass {
severity = h.severityLevels[i]
break
}
}
}

if err := h.s.Alert(
h.c.Token,
h.c.TokenPrefix,
Expand All @@ -502,3 +542,54 @@ func (h *handler) Handle(event alert.Event) {
h.diag.Error("failed to send event to Alerta", err)
}
}

func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, now time.Time, fields models.Fields, tags models.Tags) (bool, error) {
vars := scopePool.Get()
defer scopePool.Put(vars)
err := fillScope(vars, scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
return false, err
}

// for function signature check
if _, err := se.Type(vars); err != nil {
return false, err
}

return se.EvalBool(vars)
}

// fillScope - given a scope and reference variables, we fill the exact variables from the now, fields and tags.
func fillScope(vars *stateful.Scope, referenceVariables []string, now time.Time, fields models.Fields, tags models.Tags) error {
for _, refVariableName := range referenceVariables {
if refVariableName == "time" {
vars.Set("time", now.Local())
continue
}

// Support the error with tags/fields collision
var fieldValue interface{}
var isFieldExists bool
var tagValue interface{}
var isTagExists bool

if fieldValue, isFieldExists = fields[refVariableName]; isFieldExists {
vars.Set(refVariableName, fieldValue)
}

if tagValue, isTagExists = tags[refVariableName]; isTagExists {
if isFieldExists {
return fmt.Errorf("cannot have field and tags with same name %q", refVariableName)
}
vars.Set(refVariableName, tagValue)
}
if !isFieldExists && !isTagExists {
if !vars.Has(refVariableName) {
vars.Set(refVariableName, ast.MissingValue)
}

}
}

return nil
}