Skip to content

Commit

Permalink
MGMT-16979: Optimize Alarm server requests
Browse files Browse the repository at this point in the history
Use Alertmanager filter API for optimizing the Alarm server requests.
The supported properties for now are (mapping from O2 to Alertmanager):
* alarmDefinitionID --> alertname
* probableCauseID --> alertname
* extensions/<key> --> <key>
  • Loading branch information
danielerez committed May 5, 2024
1 parent b1a7a90 commit a48d47a
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 10 deletions.
25 changes: 21 additions & 4 deletions internal/service/alarm_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AlarmFetcher struct {
resourceServerURL string
resourceServerToken string
extensions []string
filters []string
jqTool *jq.Tool
}

Expand All @@ -53,6 +54,7 @@ type AlarmFetcherBuilder struct {
resourceServerURL string
resourceServerToken string
extensions []string
filters []string
}

// NewAlarmFetcher creates a builder that can then be used to configure
Expand Down Expand Up @@ -120,6 +122,13 @@ func (b *AlarmFetcherBuilder) SetExtensions(values ...string) *AlarmFetcherBuild
return b
}

// SetFilters sets the query filter to send to the Alertmanager server. This is optional.
func (b *AlarmFetcherBuilder) SetFilters(
value []string) *AlarmFetcherBuilder {
b.filters = value
return b
}

// Build uses the data stored in the builder to create and configure a new handler.
func (b *AlarmFetcherBuilder) Build() (
result *AlarmFetcher, err error) {
Expand Down Expand Up @@ -190,6 +199,7 @@ func (b *AlarmFetcherBuilder) Build() (
resourceServerURL: b.resourceServerURL,
resourceServerToken: b.resourceServerToken,
extensions: b.extensions,
filters: b.filters,
jqTool: jqTool,
}
return
Expand All @@ -201,7 +211,7 @@ func (r *AlarmFetcher) FetchItems(
ctx context.Context) (alarms data.Stream, err error) {
query := neturl.Values{}
url := r.backendURL + "/alerts"
response, err := r.doGet(ctx, url, r.backendToken, query)
response, err := r.doGet(ctx, url, r.backendToken, query, r.filters)
if err != nil {
return
}
Expand All @@ -222,16 +232,23 @@ func (r *AlarmFetcher) FetchItems(
}

func (r *AlarmFetcher) doGet(ctx context.Context, url, token string,
query neturl.Values) (response *http.Response, err error) {
query neturl.Values, filters []string) (response *http.Response, err error) {
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return
}
if query != nil {
// Add filters (if specified)
if filters != nil {
for _, filter := range r.filters {
query.Add("filter", filter)
}
}
request.URL.RawQuery = query.Encode()
}
request.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
request.Header.Set("Accept", "application/json")

response, err = r.backendClient.Do(request)
if err != nil {
return
Expand Down Expand Up @@ -350,7 +367,7 @@ func (r *AlarmFetcher) fetchResourcePool(ctx context.Context, clusterId string)
query := neturl.Values{}
query.Add("filter", fmt.Sprintf("(eq,description,%s)", clusterId))
url := r.resourceServerURL + "/resourcePools"
response, err := r.doGet(ctx, url, r.resourceServerToken, query)
response, err := r.doGet(ctx, url, r.resourceServerToken, query, nil)
if err != nil {
return
}
Expand All @@ -374,7 +391,7 @@ func (r *AlarmFetcher) fetchResource(ctx context.Context, clusterName, resourceN
query.Add("filter", fmt.Sprintf("(eq,description,%s)", resourceName))
path := fmt.Sprintf("/resourcePools/%s/resources", clusterName)
url := r.resourceServerURL + path
response, err := r.doGet(ctx, url, r.resourceServerToken, query)
response, err := r.doGet(ctx, url, r.resourceServerToken, query, nil)
if err != nil {
return
}
Expand Down
116 changes: 116 additions & 0 deletions internal/service/alarm_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
"net/http"
"slices"
Expand Down Expand Up @@ -272,6 +273,7 @@ func (h *AlarmHandler) fetchItems(
SetResourceServerURL(h.resourceServerURL).
SetResourceServerToken(h.resourceServerToken).
SetExtensions(h.extensions...).
SetFilters(h.getQueryFilters(ctx, selector)).
Build()
if err != nil {
return
Expand Down Expand Up @@ -301,3 +303,117 @@ func (h *AlarmHandler) fetchItem(ctx context.Context,

return
}

func (h *AlarmHandler) getQueryFilters(ctx context.Context, selector *search.Selector) (filters []string) {
filters = []string{}

// Add filters from the request params
if selector != nil {
for _, term := range selector.Terms {
filter, err := h.getAlertFilter(ctx, term)
if err != nil || filter == "" {
// fallback to selector filtering
continue
}
h.logger.DebugContext(
ctx,
"Mapped filter term to Alertmanager filter",
slog.String("term", term.String()),
slog.String("mapped filter", filter),
)

filters = append(filters, filter)
}
}

return
}

func (h *AlarmHandler) getAlertFilter(ctx context.Context, term *search.Term) (filter string, err error) {
// Get filter operator
var operator string
operator, err = AlertFilterOp(term.Operator).String()
if err != nil {
h.logFallbackError(
ctx,
slog.String("filter", term.String()),
slog.String("error", err.Error()),
)
return
}

// Validate term values
if len(term.Values) != 1 {
h.logFallbackError(
ctx,
slog.Any("term values", term.Values),
)
return
}

// Map filter property for Alertmanager
var property string
if len(term.Path) == 1 {
property = AlertFilterProperty(term.Path[0]).MapProperty()
if property == "" {
h.logFallbackError(
ctx,
slog.Any("term path", term.Path),
)
return
}
} else if term.Path[0] == "extensions" {
// Support filtering by an extension (e.g. 'extensions/severity')
property = term.Path[1]
} else {
// No filter
return
}

filter = fmt.Sprintf("%s%s%s", property, operator, term.Values[0])

return
}

func (h *AlarmHandler) logFallbackError(ctx context.Context, messages ...any) {
h.logger.ErrorContext(
ctx,
"Failed to map Alertmanager filter term (fallback to selector filtering)",
messages...,
)
}

type AlertFilterOp search.Operator

// String generates an Alertmanager string representation of the operator.
// It panics if used on an unknown operator.
func (o AlertFilterOp) String() (result string, err error) {
switch search.Operator(o) {
case search.Eq:
result = "="
case search.Neq:
result = "!="
case search.Cont:
result = "=~"
case search.Ncont:
result = "!~"
default:
err = fmt.Errorf("unknown operator %d", o)
}
return
}

type AlertFilterProperty string

// MapProperty maps a specified O2 property to the Alertmanager property
func (p AlertFilterProperty) MapProperty() string {
switch p {
case "alarmDefinitionID":
return "alertname"
case "probableCauseID":
return "alertname"
default:
// unknown property
return ""
}
}
82 changes: 76 additions & 6 deletions internal/service/alarm_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,21 @@ var _ = Describe("Alarm handler", func() {
Terms: []*search.Term{{
Operator: search.Eq,
Path: []string{
"alarmEventRecordId",
"alarmDefinitionID",
},
Values: []any{
"alert_spoke0",
"ClusterNotUpgradeable",
},
}},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(response).ToNot(BeNil())

// Verify filters
Expect(handler.alarmFetcher.filters).To(ContainElement(
"alertname=ClusterNotUpgradeable",
))
})

It("Accepts multiple filters", func() {
Expand All @@ -309,26 +314,91 @@ var _ = Describe("Alarm handler", func() {
{
Operator: search.Eq,
Path: []string{
"alarmEventRecordId",
"alarmDefinitionID",
},
Values: []any{
"alert_spoke0",
"ClusterNotUpgradeable",
},
},
{
Operator: search.Neq,
Path: []string{
"perceivedSeverity",
"probableCauseID",
},
Values: []any{
"CRITICAL",
"NodeClockNotSynchronising",
},
},
},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(response).ToNot(BeNil())

// Verify filters
Expect(handler.alarmFetcher.filters).To(ContainElement(
"alertname=ClusterNotUpgradeable",
))
Expect(handler.alarmFetcher.filters).To(ContainElement(
"alertname!=NodeClockNotSynchronising",
))
})

It("Accepts a filter by extension", func() {
// Prepare the backend:
backend.AppendHandlers(
RespondWithItems(),
)

// Send the request:
response, err := handler.List(ctx, &ListRequest{
Selector: &search.Selector{
Terms: []*search.Term{{
Operator: search.Eq,
Path: []string{
"extensions",
"severity",
},
Values: []any{
"critical",
},
}},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(response).ToNot(BeNil())

// Verify filters
Expect(handler.alarmFetcher.filters).To(ContainElement(
"severity=critical",
))
})

It("No filter for unknown property", func() {
// Prepare the backend:
backend.AppendHandlers(
RespondWithItems(),
)

// Send the request:
response, err := handler.List(ctx, &ListRequest{
Selector: &search.Selector{
Terms: []*search.Term{{
Operator: search.Eq,
Path: []string{
"unknown",
},
Values: []any{
"value",
},
}},
},
})
Expect(err).ToNot(HaveOccurred())
Expect(response).ToNot(BeNil())

// Verify filters
Expect(handler.alarmFetcher.filters).To(HaveLen(0))
})

It("Adds configurable extensions", func() {
Expand Down

0 comments on commit a48d47a

Please sign in to comment.