Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEV-44293 - Support Logz Datasource #19

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions custom.ini
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ news_feed_enabled = false

[alerting]
enabled = false
execute_alerts = false
execute_alerts = true
min_interval_seconds = 60

[unified_alerting]
enabled = true
execute_alerts = false
execute_alerts = true
alert_manager_enabled = true
scheduled_evaluation_enabled = false

[security]
disable_gravatar = true
Expand All @@ -81,4 +82,4 @@ min_refresh_interval = 30s

[log.frontend]
enabled = true
custom_endpoint = log
custom_endpoint = log
5 changes: 4 additions & 1 deletion pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func (hs *HTTPServer) QueryMetricsV2(c *contextmodel.ReqContext) response.Respon
return response.Error(http.StatusBadRequest, "bad request data", err)
}

resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO)
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header)
resp, err := hs.queryDataService.QueryData(ctxWithLogzHeaders, c.SignedInUser, c.SkipDSCache, reqDTO)
// LOGZ.IO GRAFANA CHANGE :: End
if err != nil {
return hs.handleQueryMetricsError(err)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/expr/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -347,7 +348,15 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
}()

resp, err := s.dataService.QueryData(ctx, req)
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
logzHeaders := http.Header{}
for k, v := range req.Headers {
logzHeaders[k] = []string{v}
}
ctxWithLogzio := context.WithValue(ctx, "logzioHeaders", logzHeaders)

resp, err := s.dataService.QueryData(ctxWithLogzio, req)
// LOGZ.IO GRAFANA CHANGE :: End
if err != nil {
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/models/headers_logzio.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type LogzIoHeaders struct {

var logzioHeadersWhitelist = []string{
"user-context",
"X-Logz-Query-Context",
"Query-Source",
yasmin-tr marked this conversation as resolved.
Show resolved Hide resolved
}

func (logzioHeaders *LogzIoHeaders) GetDatasourceQueryHeaders(grafanaGeneratedHeaders http.Header) http.Header {
Expand All @@ -19,7 +21,11 @@ func (logzioHeaders *LogzIoHeaders) GetDatasourceQueryHeaders(grafanaGeneratedHe

for _, whitelistedHeader := range logzioHeadersWhitelist {
if requestHeader := logzioGrafanaRequestHeaders.Get(whitelistedHeader); requestHeader != "" {
datasourceRequestHeaders.Set(whitelistedHeader, requestHeader)
if whitelistedHeader == "X-Logz-Query-Context" {
datasourceRequestHeaders.Set("User-Context", requestHeader)
} else {
datasourceRequestHeaders.Set(whitelistedHeader, requestHeader)
}
}
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/services/ngalert/api/alerting_logzio.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,11 @@ func (srv *LogzioAlertingService) RouteEvaluateAlert(c *contextmodel.ReqContext,
for _, evalRequest := range evalRequests {
c.Logger.Info("Evaluate Alert API", "evalTime", evalRequest.EvalTime, "ruleTitle", evalRequest.AlertRule.Title, "ruleUID", evalRequest.AlertRule.UID)

var dsOverrideByDsUid = map[string]ngmodels.EvaluationDatasourceOverride{}
if evalRequest.DsOverrides != nil {
for _, dsOverride := range evalRequest.DsOverrides {
dsOverrideByDsUid[dsOverride.DsUid] = dsOverride
}
}

evalReq := ngmodels.ExternalAlertEvaluationRequest{
AlertRule: evalRequest.AlertRule,
EvalTime: evalRequest.EvalTime,
FolderTitle: evalRequest.FolderTitle,
LogzioEvalContext: ngmodels.LogzioAlertRuleEvalContext{
LogzioHeaders: c.Req.Header,
DsOverrideByDsUid: dsOverrideByDsUid,
},
LogzHeaders: c.Req.Header,
}
err := srv.Schedule.RunRuleEvaluation(c.Req.Context(), evalReq)

Expand Down
10 changes: 8 additions & 2 deletions pkg/services/ngalert/api/api_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext,
}
}

evaluator, err := srv.evaluator.Create(eval.NewContext(c.Req.Context(), c.SignedInUser), rule.GetEvalCondition())
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header)
evaluator, err := srv.evaluator.Create(eval.NewContext(ctxWithLogzHeaders, c.SignedInUser), rule.GetEvalCondition())
// LOGZ.IO GRAFANA CHANGE :: End
if err != nil {
return ErrResp(http.StatusBadRequest, err, "Failed to build evaluator for queries and expressions")
}
Expand Down Expand Up @@ -183,7 +186,10 @@ func (srv TestingApiSrv) RouteEvalQueries(c *contextmodel.ReqContext, cmd apimod
}
}

evaluator, err := srv.evaluator.Create(eval.NewContext(c.Req.Context(), c.SignedInUser), cond)
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header)
evaluator, err := srv.evaluator.Create(eval.NewContext(ctxWithLogzHeaders, c.SignedInUser), cond)
// LOGZ.IO GRAFANA CHANGE :: End

if err != nil {
return ErrResp(http.StatusBadRequest, err, "Failed to build evaluator for queries and expressions")
Expand Down
7 changes: 3 additions & 4 deletions pkg/services/ngalert/api/tooling/definitions/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,9 @@ type AlertSendNotificationsRequest struct {

// LOGZ.IO GRAFANA CHANGE :: DEV-43744: add api models for alert evaluation requests/responses
type AlertEvaluationRequest struct {
AlertRule models.AlertRule `json:"alertRule"`
EvalTime time.Time `json:"evalTime"`
FolderTitle string `json:"folderTitle"`
DsOverrides []models.EvaluationDatasourceOverride `json:"dsOverrides"`
AlertRule models.AlertRule `json:"alertRule"`
EvalTime time.Time `json:"evalTime"`
FolderTitle string `json:"folderTitle"`
}

type AlertEvalRunResult struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/ngalert/backtesting/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newBacktestingEvaluator(ctx context.Context, evalFactory eval.EvaluatorFact
}
}

evaluator, err := evalFactory.Create(eval.NewContextWithPreviousResults(ctx, user, reader, nil), condition) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support
evaluator, err := evalFactory.Create(eval.NewContextWithPreviousResults(ctx, user, reader), condition)

if err != nil {
return nil, err
Expand Down
29 changes: 3 additions & 26 deletions pkg/services/ngalert/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package eval

import (
"context"
"net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts

"github.com/grafana/grafana-plugin-sdk-go/data"

"github.com/grafana/grafana/pkg/services/auth/identity"
"github.com/grafana/grafana/pkg/services/ngalert/models" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
)

// AlertingResultsReader provides fingerprints of results that are in alerting state.
Expand All @@ -21,39 +18,19 @@ type EvaluationContext struct {
Ctx context.Context
User identity.Requester
AlertingResultsReader AlertingResultsReader
LogzioEvalContext *models.LogzioAlertRuleEvalContext // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
}

func NewContext(ctx context.Context, user identity.Requester) EvaluationContext {
// LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
logzioEvalContext := &models.LogzioAlertRuleEvalContext{
LogzioHeaders: http.Header{},
DsOverrideByDsUid: map[string]models.EvaluationDatasourceOverride{},
}
// LOGZ.IO GRAFANA CHANGE :: end

return EvaluationContext{
Ctx: ctx,
User: user,
LogzioEvalContext: logzioEvalContext, // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
Ctx: ctx,
User: user,
}
}

// LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
func NewContextWithPreviousResults(ctx context.Context, user identity.Requester, reader AlertingResultsReader, logzioEvalContext *models.LogzioAlertRuleEvalContext) EvaluationContext {
if logzioEvalContext == nil {
logzioEvalContext = &models.LogzioAlertRuleEvalContext{
LogzioHeaders: http.Header{},
DsOverrideByDsUid: map[string]models.EvaluationDatasourceOverride{},
}
}

func NewContextWithPreviousResults(ctx context.Context, user identity.Requester, reader AlertingResultsReader) EvaluationContext {
return EvaluationContext{
Ctx: ctx,
User: user,
AlertingResultsReader: reader,
LogzioEvalContext: logzioEvalContext,
}
}

// LOGZ.IO GRAFANA CHANGE :: end
33 changes: 16 additions & 17 deletions pkg/services/ngalert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"errors"
"fmt"
m "github.com/grafana/grafana/pkg/models"
"net/http"
"runtime/debug"
"sort"
"strconv"
Expand All @@ -18,7 +20,6 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/infra/log"
m "github.com/grafana/grafana/pkg/models" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - change to EvaluationContext
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/models"
Expand Down Expand Up @@ -313,17 +314,23 @@ func buildDatasourceHeaders(ctx EvaluationContext) map[string]string { // LOGZ.I
}

// LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts
logzioEvalContext := ctx.LogzioEvalContext
logzioHeaders := m.LogzIoHeaders{RequestHeaders: logzioEvalContext.LogzioHeaders}
requestHeaders := make(map[string][]string, len(headers))
logzIoHeaders := &m.LogzIoHeaders{}
logzHeaders := ctx.Ctx.Value("logzioHeaders")
if logzHeaders != nil {
logzIoHeaders.RequestHeaders = http.Header{}
for k, v := range logzHeaders.(http.Header) {
logzIoHeaders.RequestHeaders[k] = v
}
newHeaders := http.Header{}
for k, v := range headers {
newHeaders[k] = []string{v}
}

for k, v := range headers {
requestHeaders[k] = []string{v}
for k, v := range logzIoHeaders.GetDatasourceQueryHeaders(newHeaders) {
headers[k] = v[0]
}
}

for k, v := range logzioHeaders.GetDatasourceQueryHeaders(requestHeaders) {
headers[k] = v[0]
}
logger.Debug("Added following headers to request", "headers", headers)
// LOGZ.IO GRAFANA CHANGE :: End

Expand All @@ -346,14 +353,6 @@ func getExprRequest(ctx EvaluationContext, condition models.Condition, dsCacheSe
switch nodeType := expr.NodeTypeFromDatasourceUID(q.DatasourceUID); nodeType {
case expr.TypeDatasourceNode:
ds, err = dsCacheService.GetDatasourceByUID(ctx.Ctx, q.DatasourceUID, ctx.User, false /*skipCache*/)
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasource url override
if ds != nil {
if dsOverride, found := ctx.LogzioEvalContext.DsOverrideByDsUid[q.DatasourceUID]; found {
logger.Debug("Adding dsOverride", "datasource_uid", q.DatasourceUID, "dsOverride", dsOverride.UrlOverride)
ds.URL = dsOverride.UrlOverride
}
}
// // LOGZ.IO GRAFANA CHANGE :: End
default:
ds, err = expr.DataSourceModelFromNodeType(nodeType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/ngalert/eval/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func TestCreate_HysteresisCommand(t *testing.T) {
pluginsStore: store,
})
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold), nil, tracing.InitializeTracerForTest()), store)
evalCtx := NewContextWithPreviousResults(context.Background(), u, testCase.reader, nil) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support
evalCtx := NewContextWithPreviousResults(context.Background(), u, testCase.reader)

eval, err := evaluator.Create(evalCtx, condition)
if testCase.error {
Expand Down
24 changes: 8 additions & 16 deletions pkg/services/ngalert/models/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator
"net/http"

//"net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -661,22 +663,12 @@ func (c Condition) IsValid() bool {
return len(c.Data) != 0
}

// LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator
// LOGZ.IO GRAFANA CHANGE :: DEV-43883 Support external alert evaluation
type ExternalAlertEvaluationRequest struct {
AlertRule AlertRule `json:"alertRule"`
EvalTime time.Time `json:"evalTime"`
FolderTitle string `json:"folderTitle"`
LogzioEvalContext LogzioAlertRuleEvalContext `json:"logzioEvalContext"`
}

type LogzioAlertRuleEvalContext struct {
LogzioHeaders http.Header `json:"headers"`
DsOverrideByDsUid map[string]EvaluationDatasourceOverride `json:"dsOverride"`
}

type EvaluationDatasourceOverride struct {
DsUid string `json:"dsUid"`
UrlOverride string `json:"urlOverride"`
AlertRule AlertRule
EvalTime time.Time
FolderTitle string
LogzHeaders http.Header
}

// LOGZ.IO GRAFANA CHANGE :: end
Expand Down
9 changes: 5 additions & 4 deletions pkg/services/ngalert/schedule/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash/fnv"
"math"
"net/http"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -127,10 +128,10 @@ func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
}

type evaluation struct {
scheduledAt time.Time
rule *models.AlertRule
folderTitle string
logzioEvalContext models.LogzioAlertRuleEvalContext // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support
scheduledAt time.Time
rule *models.AlertRule
folderTitle string
logzHeaders http.Header // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
}

type alertRulesRegistry struct {
Expand Down
35 changes: 20 additions & 15 deletions pkg/services/ngalert/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,22 +363,24 @@ func (sch *schedule) RunRuleEvaluation(ctx context.Context, evalReq ngmodels.Ext
UID: evalReq.AlertRule.UID,
}
ev := evaluation{
scheduledAt: evalReq.EvalTime,
rule: &evalReq.AlertRule,
folderTitle: evalReq.FolderTitle,
logzioEvalContext: evalReq.LogzioEvalContext,
scheduledAt: evalReq.EvalTime,
rule: &evalReq.AlertRule,
folderTitle: evalReq.FolderTitle,
logzHeaders: evalReq.LogzHeaders,
}

// TODO: decide if we want to create the new routine if needed, or return error and rely on scheduler creating it + retrying on the sending side ??
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, alertKey)
if !newRoutine {
logger.Debug("RunRuleEvaluation: sending ruleInfo.eval")
sent, dropped := ruleInfo.eval(&ev)
if !sent {
return fmt.Errorf("evaluation was not sent")
}
if dropped != nil {
logger.Warn("RunRuleEvaluation: got dropped eval", "dropped", dropped)
if sch.registry.exists(alertKey) {
// since we only get if exists then it shouldn't create and routine should exist
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, alertKey)
if !newRoutine {
logger.Debug("RunRuleEvaluation: sending ruleInfo.eval")
sent, dropped := ruleInfo.eval(&ev)
if !sent {
return fmt.Errorf("evaluation was not sent")
}
if dropped != nil {
logger.Warn("RunRuleEvaluation: got dropped eval", "dropped", dropped)
}
}
} else {
return fmt.Errorf("no rule routine for alert key %s", alertKey)
Expand Down Expand Up @@ -423,7 +425,10 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
start := sch.clock.Now()

evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule), &e.logzioEvalContext) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support
// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support
ctxWithLogzHeaders := context.WithValue(ctx, "logzioHeaders", e.logzHeaders)
evalCtx := eval.NewContextWithPreviousResults(ctxWithLogzHeaders, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule))
// LOGZ.IO GRAFANA CHANGE :: End
if sch.evaluatorFactory == nil {
panic("evalfactory nil")
}
Expand Down
Loading
Loading