From ad3731980ce93883b0a8f3c26685c49c692bd77f Mon Sep 17 00:00:00 2001 From: kittycatbone Date: Tue, 7 May 2024 22:00:51 +0300 Subject: [PATCH 1/5] DEV-44293 - Support Logz Datasource * Added required changes in elasticsearch and prometheus clients to add logz headers so we can call logz datasources * Refactored implementation in logz alert evaluation so we only pass headers because url change is no longer required * Added fix to alert evaluation to check if rule routine exists instead of getOrCreate which caused routines not to be created via scheduler if eval called before scheduler updates registry. * added to custom.ini required configs for logz alert evaluation to work --- custom.ini | 7 ++-- pkg/api/metrics.go | 5 ++- pkg/expr/nodes.go | 11 +++++- pkg/models/headers_logzio.go | 8 ++++- pkg/services/ngalert/api/alerting_logzio.go | 12 +------ pkg/services/ngalert/api/api_testing.go | 10 ++++-- .../api/tooling/definitions/alertmanager.go | 7 ++-- pkg/services/ngalert/backtesting/engine.go | 2 +- pkg/services/ngalert/eval/context.go | 29 ++------------- pkg/services/ngalert/eval/eval.go | 33 +++++++++-------- pkg/services/ngalert/eval/eval_test.go | 2 +- pkg/services/ngalert/models/alert_rule.go | 24 +++++-------- pkg/services/ngalert/schedule/registry.go | 9 ++--- pkg/services/ngalert/schedule/schedule.go | 35 +++++++++++-------- pkg/tsdb/elasticsearch/client/client.go | 6 ++-- pkg/tsdb/elasticsearch/elasticsearch.go | 2 +- pkg/tsdb/prometheus/client/client.go | 23 ++++++++---- 17 files changed, 112 insertions(+), 113 deletions(-) diff --git a/custom.ini b/custom.ini index a286d8da4f7ba..fe3ffbf6e9f97 100644 --- a/custom.ini +++ b/custom.ini @@ -48,13 +48,14 @@ news_feed_enabled = false [alerting] enabled = false -execute_alerts = false +execute_alerts = true min_interval_seconds = 60 [unified_alerting] enabled = true -execute_alerts = false +execute_alerts = true alert_manager_enabled = true +scheduled_evaluation_enabled = false [security] disable_gravatar = true @@ -81,4 +82,4 @@ min_refresh_interval = 30s [log.frontend] enabled = true -custom_endpoint = log \ No newline at end of file +custom_endpoint = log diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index df798af8ca886..3d98329c87c73 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -78,7 +78,10 @@ func (hs *HTTPServer) QueryMetricsV2(c *contextmodel.ReqContext) response.Respon return response.Error(http.StatusBadRequest, "bad request data", err) } - resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipDSCache, reqDTO) + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header) + resp, err := hs.queryDataService.QueryData(ctxWithLogzHeaders, c.SignedInUser, c.SkipDSCache, reqDTO) + // LOGZ.IO GRAFANA CHANGE :: End if err != nil { return hs.handleQueryMetricsError(err) } diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 32883d305d6e7..a3bb977a71ff3 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "strings" "time" @@ -347,7 +348,15 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc() }() - resp, err := s.dataService.QueryData(ctx, req) + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + logzHeaders := http.Header{} + for k, v := range req.Headers { + logzHeaders[k] = []string{v} + } + ctxWithLogzio := context.WithValue(ctx, "logzioHeaders", logzHeaders) + + resp, err := s.dataService.QueryData(ctxWithLogzio, req) + // LOGZ.IO GRAFANA CHANGE :: End if err != nil { return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) } diff --git a/pkg/models/headers_logzio.go b/pkg/models/headers_logzio.go index c1b3f2c7324dd..4b8425a9d0894 100644 --- a/pkg/models/headers_logzio.go +++ b/pkg/models/headers_logzio.go @@ -11,6 +11,8 @@ type LogzIoHeaders struct { var logzioHeadersWhitelist = []string{ "user-context", + "X-Logz-Query-Context", + "Query-Source", } func (logzioHeaders *LogzIoHeaders) GetDatasourceQueryHeaders(grafanaGeneratedHeaders http.Header) http.Header { @@ -19,7 +21,11 @@ func (logzioHeaders *LogzIoHeaders) GetDatasourceQueryHeaders(grafanaGeneratedHe for _, whitelistedHeader := range logzioHeadersWhitelist { if requestHeader := logzioGrafanaRequestHeaders.Get(whitelistedHeader); requestHeader != "" { - datasourceRequestHeaders.Set(whitelistedHeader, requestHeader) + if whitelistedHeader == "X-Logz-Query-Context" { + datasourceRequestHeaders.Set("User-Context", requestHeader) + } else { + datasourceRequestHeaders.Set(whitelistedHeader, requestHeader) + } } } diff --git a/pkg/services/ngalert/api/alerting_logzio.go b/pkg/services/ngalert/api/alerting_logzio.go index 6d96816547b50..6d9e3db31c60e 100644 --- a/pkg/services/ngalert/api/alerting_logzio.go +++ b/pkg/services/ngalert/api/alerting_logzio.go @@ -47,21 +47,11 @@ func (srv *LogzioAlertingService) RouteEvaluateAlert(c *contextmodel.ReqContext, for _, evalRequest := range evalRequests { c.Logger.Info("Evaluate Alert API", "evalTime", evalRequest.EvalTime, "ruleTitle", evalRequest.AlertRule.Title, "ruleUID", evalRequest.AlertRule.UID) - var dsOverrideByDsUid = map[string]ngmodels.EvaluationDatasourceOverride{} - if evalRequest.DsOverrides != nil { - for _, dsOverride := range evalRequest.DsOverrides { - dsOverrideByDsUid[dsOverride.DsUid] = dsOverride - } - } - evalReq := ngmodels.ExternalAlertEvaluationRequest{ AlertRule: evalRequest.AlertRule, EvalTime: evalRequest.EvalTime, FolderTitle: evalRequest.FolderTitle, - LogzioEvalContext: ngmodels.LogzioAlertRuleEvalContext{ - LogzioHeaders: c.Req.Header, - DsOverrideByDsUid: dsOverrideByDsUid, - }, + LogzHeaders: c.Req.Header, } err := srv.Schedule.RunRuleEvaluation(c.Req.Context(), evalReq) diff --git a/pkg/services/ngalert/api/api_testing.go b/pkg/services/ngalert/api/api_testing.go index 5e54dc8869cf9..5c38933acd625 100644 --- a/pkg/services/ngalert/api/api_testing.go +++ b/pkg/services/ngalert/api/api_testing.go @@ -83,7 +83,10 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext, } } - evaluator, err := srv.evaluator.Create(eval.NewContext(c.Req.Context(), c.SignedInUser), rule.GetEvalCondition()) + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header) + evaluator, err := srv.evaluator.Create(eval.NewContext(ctxWithLogzHeaders, c.SignedInUser), rule.GetEvalCondition()) + // LOGZ.IO GRAFANA CHANGE :: End if err != nil { return ErrResp(http.StatusBadRequest, err, "Failed to build evaluator for queries and expressions") } @@ -183,7 +186,10 @@ func (srv TestingApiSrv) RouteEvalQueries(c *contextmodel.ReqContext, cmd apimod } } - evaluator, err := srv.evaluator.Create(eval.NewContext(c.Req.Context(), c.SignedInUser), cond) + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + ctxWithLogzHeaders := context.WithValue(c.Req.Context(), "logzioHeaders", c.Req.Header) + evaluator, err := srv.evaluator.Create(eval.NewContext(ctxWithLogzHeaders, c.SignedInUser), cond) + // LOGZ.IO GRAFANA CHANGE :: End if err != nil { return ErrResp(http.StatusBadRequest, err, "Failed to build evaluator for queries and expressions") diff --git a/pkg/services/ngalert/api/tooling/definitions/alertmanager.go b/pkg/services/ngalert/api/tooling/definitions/alertmanager.go index d09d96894d8a9..28347e7b70ccd 100644 --- a/pkg/services/ngalert/api/tooling/definitions/alertmanager.go +++ b/pkg/services/ngalert/api/tooling/definitions/alertmanager.go @@ -562,10 +562,9 @@ type AlertSendNotificationsRequest struct { // LOGZ.IO GRAFANA CHANGE :: DEV-43744: add api models for alert evaluation requests/responses type AlertEvaluationRequest struct { - AlertRule models.AlertRule `json:"alertRule"` - EvalTime time.Time `json:"evalTime"` - FolderTitle string `json:"folderTitle"` - DsOverrides []models.EvaluationDatasourceOverride `json:"dsOverrides"` + AlertRule models.AlertRule `json:"alertRule"` + EvalTime time.Time `json:"evalTime"` + FolderTitle string `json:"folderTitle"` } type AlertEvalRunResult struct { diff --git a/pkg/services/ngalert/backtesting/engine.go b/pkg/services/ngalert/backtesting/engine.go index 95beaa0b71484..30d4c8135b9d6 100644 --- a/pkg/services/ngalert/backtesting/engine.go +++ b/pkg/services/ngalert/backtesting/engine.go @@ -156,7 +156,7 @@ func newBacktestingEvaluator(ctx context.Context, evalFactory eval.EvaluatorFact } } - evaluator, err := evalFactory.Create(eval.NewContextWithPreviousResults(ctx, user, reader, nil), condition) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support + evaluator, err := evalFactory.Create(eval.NewContextWithPreviousResults(ctx, user, reader), condition) if err != nil { return nil, err diff --git a/pkg/services/ngalert/eval/context.go b/pkg/services/ngalert/eval/context.go index 480920625f126..b1919d63a8f4a 100644 --- a/pkg/services/ngalert/eval/context.go +++ b/pkg/services/ngalert/eval/context.go @@ -2,12 +2,9 @@ package eval import ( "context" - "net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts - "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/services/auth/identity" - "github.com/grafana/grafana/pkg/services/ngalert/models" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts ) // AlertingResultsReader provides fingerprints of results that are in alerting state. @@ -21,39 +18,19 @@ type EvaluationContext struct { Ctx context.Context User identity.Requester AlertingResultsReader AlertingResultsReader - LogzioEvalContext *models.LogzioAlertRuleEvalContext // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts } func NewContext(ctx context.Context, user identity.Requester) EvaluationContext { - // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts - logzioEvalContext := &models.LogzioAlertRuleEvalContext{ - LogzioHeaders: http.Header{}, - DsOverrideByDsUid: map[string]models.EvaluationDatasourceOverride{}, - } - // LOGZ.IO GRAFANA CHANGE :: end - return EvaluationContext{ - Ctx: ctx, - User: user, - LogzioEvalContext: logzioEvalContext, // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts + Ctx: ctx, + User: user, } } -// LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts -func NewContextWithPreviousResults(ctx context.Context, user identity.Requester, reader AlertingResultsReader, logzioEvalContext *models.LogzioAlertRuleEvalContext) EvaluationContext { - if logzioEvalContext == nil { - logzioEvalContext = &models.LogzioAlertRuleEvalContext{ - LogzioHeaders: http.Header{}, - DsOverrideByDsUid: map[string]models.EvaluationDatasourceOverride{}, - } - } - +func NewContextWithPreviousResults(ctx context.Context, user identity.Requester, reader AlertingResultsReader) EvaluationContext { return EvaluationContext{ Ctx: ctx, User: user, AlertingResultsReader: reader, - LogzioEvalContext: logzioEvalContext, } } - -// LOGZ.IO GRAFANA CHANGE :: end diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index 237baefeec430..5cb2a4f35262b 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -6,6 +6,8 @@ import ( "context" "errors" "fmt" + m "github.com/grafana/grafana/pkg/models" + "net/http" "runtime/debug" "sort" "strconv" @@ -18,7 +20,6 @@ import ( "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/expr/classic" "github.com/grafana/grafana/pkg/infra/log" - m "github.com/grafana/grafana/pkg/models" // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - change to EvaluationContext "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -313,17 +314,23 @@ func buildDatasourceHeaders(ctx EvaluationContext) map[string]string { // LOGZ.I } // LOGZ.IO GRAFANA CHANGE :: DEV-43744 - Pass headers and custom datasource to evaluate alerts - logzioEvalContext := ctx.LogzioEvalContext - logzioHeaders := m.LogzIoHeaders{RequestHeaders: logzioEvalContext.LogzioHeaders} - requestHeaders := make(map[string][]string, len(headers)) + logzIoHeaders := &m.LogzIoHeaders{} + logzHeaders := ctx.Ctx.Value("logzioHeaders") + if logzHeaders != nil { + logzIoHeaders.RequestHeaders = http.Header{} + for k, v := range logzHeaders.(http.Header) { + logzIoHeaders.RequestHeaders[k] = v + } + newHeaders := http.Header{} + for k, v := range headers { + newHeaders[k] = []string{v} + } - for k, v := range headers { - requestHeaders[k] = []string{v} + for k, v := range logzIoHeaders.GetDatasourceQueryHeaders(newHeaders) { + headers[k] = v[0] + } } - for k, v := range logzioHeaders.GetDatasourceQueryHeaders(requestHeaders) { - headers[k] = v[0] - } logger.Debug("Added following headers to request", "headers", headers) // LOGZ.IO GRAFANA CHANGE :: End @@ -346,14 +353,6 @@ func getExprRequest(ctx EvaluationContext, condition models.Condition, dsCacheSe switch nodeType := expr.NodeTypeFromDatasourceUID(q.DatasourceUID); nodeType { case expr.TypeDatasourceNode: ds, err = dsCacheService.GetDatasourceByUID(ctx.Ctx, q.DatasourceUID, ctx.User, false /*skipCache*/) - // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasource url override - if ds != nil { - if dsOverride, found := ctx.LogzioEvalContext.DsOverrideByDsUid[q.DatasourceUID]; found { - logger.Debug("Adding dsOverride", "datasource_uid", q.DatasourceUID, "dsOverride", dsOverride.UrlOverride) - ds.URL = dsOverride.UrlOverride - } - } - // // LOGZ.IO GRAFANA CHANGE :: End default: ds, err = expr.DataSourceModelFromNodeType(nodeType) } diff --git a/pkg/services/ngalert/eval/eval_test.go b/pkg/services/ngalert/eval/eval_test.go index 66a38d6917524..1a7615ef5dc21 100644 --- a/pkg/services/ngalert/eval/eval_test.go +++ b/pkg/services/ngalert/eval/eval_test.go @@ -709,7 +709,7 @@ func TestCreate_HysteresisCommand(t *testing.T) { pluginsStore: store, }) evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold), nil, tracing.InitializeTracerForTest()), store) - evalCtx := NewContextWithPreviousResults(context.Background(), u, testCase.reader, nil) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support + evalCtx := NewContextWithPreviousResults(context.Background(), u, testCase.reader) eval, err := evaluator.Create(evalCtx, condition) if testCase.error { diff --git a/pkg/services/ngalert/models/alert_rule.go b/pkg/services/ngalert/models/alert_rule.go index 5cf4a5192f832..1da1e2fc35679 100644 --- a/pkg/services/ngalert/models/alert_rule.go +++ b/pkg/services/ngalert/models/alert_rule.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" - "net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator + "net/http" + + //"net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator "sort" "strconv" "strings" @@ -661,22 +663,12 @@ func (c Condition) IsValid() bool { return len(c.Data) != 0 } -// LOGZ.IO GRAFANA CHANGE :: DEV-43883 Override datasource URL and pass custom headers to alert rule evaluator +// LOGZ.IO GRAFANA CHANGE :: DEV-43883 Support external alert evaluation type ExternalAlertEvaluationRequest struct { - AlertRule AlertRule `json:"alertRule"` - EvalTime time.Time `json:"evalTime"` - FolderTitle string `json:"folderTitle"` - LogzioEvalContext LogzioAlertRuleEvalContext `json:"logzioEvalContext"` -} - -type LogzioAlertRuleEvalContext struct { - LogzioHeaders http.Header `json:"headers"` - DsOverrideByDsUid map[string]EvaluationDatasourceOverride `json:"dsOverride"` -} - -type EvaluationDatasourceOverride struct { - DsUid string `json:"dsUid"` - UrlOverride string `json:"urlOverride"` + AlertRule AlertRule + EvalTime time.Time + FolderTitle string + LogzHeaders http.Header } // LOGZ.IO GRAFANA CHANGE :: end diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index f8ac0d77d2f08..64f89c5c3c826 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -7,6 +7,7 @@ import ( "fmt" "hash/fnv" "math" + "net/http" "sort" "sync" "time" @@ -127,10 +128,10 @@ func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool { } type evaluation struct { - scheduledAt time.Time - rule *models.AlertRule - folderTitle string - logzioEvalContext models.LogzioAlertRuleEvalContext // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support + scheduledAt time.Time + rule *models.AlertRule + folderTitle string + logzHeaders http.Header // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support } type alertRulesRegistry struct { diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index b59709a8fa75a..a9051eee36c9a 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -363,22 +363,24 @@ func (sch *schedule) RunRuleEvaluation(ctx context.Context, evalReq ngmodels.Ext UID: evalReq.AlertRule.UID, } ev := evaluation{ - scheduledAt: evalReq.EvalTime, - rule: &evalReq.AlertRule, - folderTitle: evalReq.FolderTitle, - logzioEvalContext: evalReq.LogzioEvalContext, + scheduledAt: evalReq.EvalTime, + rule: &evalReq.AlertRule, + folderTitle: evalReq.FolderTitle, + logzHeaders: evalReq.LogzHeaders, } - // TODO: decide if we want to create the new routine if needed, or return error and rely on scheduler creating it + retrying on the sending side ?? - ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, alertKey) - if !newRoutine { - logger.Debug("RunRuleEvaluation: sending ruleInfo.eval") - sent, dropped := ruleInfo.eval(&ev) - if !sent { - return fmt.Errorf("evaluation was not sent") - } - if dropped != nil { - logger.Warn("RunRuleEvaluation: got dropped eval", "dropped", dropped) + if sch.registry.exists(alertKey) { + // since we only get if exists then it shouldn't create and routine should exist + ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, alertKey) + if !newRoutine { + logger.Debug("RunRuleEvaluation: sending ruleInfo.eval") + sent, dropped := ruleInfo.eval(&ev) + if !sent { + return fmt.Errorf("evaluation was not sent") + } + if dropped != nil { + logger.Warn("RunRuleEvaluation: got dropped eval", "dropped", dropped) + } } } else { return fmt.Errorf("no rule routine for alert key %s", alertKey) @@ -423,7 +425,10 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx) start := sch.clock.Now() - evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule), &e.logzioEvalContext) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add logzio datasources support + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + ctxWithLogzHeaders := context.WithValue(ctx, "logzioHeaders", e.logzHeaders) + evalCtx := eval.NewContextWithPreviousResults(ctxWithLogzHeaders, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule)) + // LOGZ.IO GRAFANA CHANGE :: End if sch.evaluatorFactory == nil { panic("evalfactory nil") } diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 4cb458a4b2e24..8033c2c848f26 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -77,8 +77,8 @@ var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend. headers := ctx.Value("logzioHeaders") if headers != nil { logzIoHeaders.RequestHeaders = http.Header{} - for key, value := range headers.(map[string]string) { - logzIoHeaders.RequestHeaders.Set(key, value) + for k, v := range headers.(http.Header) { + logzIoHeaders.RequestHeaders[k] = v } } // LOGZ.IO GRAFANA CHANGE :: End @@ -175,7 +175,7 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ req.Header = c.logzIoHeaders.GetDatasourceQueryHeaders(req.Header) // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Support external alert evaluation req.Header.Set("Content-Type", "application/json") // LOGZ.IO GRAFANA CHANGE :: DEV-43744 use application/json to interact with query-service - + c.logger.Debug("request headers", "headers", req.Header) //nolint:bodyclose resp, err := c.ds.HTTPClient.Do(req) if err != nil { diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index bb12e6e907eb9..bc9138c657692 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -71,7 +71,7 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *es.Da return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - client, err := es.NewClient(context.WithValue(ctx, "logzioHeaders", req.Headers), dsInfo, req.Queries[0].TimeRange, logger, tracer) // LOGZ.IO GRAFANA CHANGE :: DEV-43883 - add LogzIoHeaders + client, err := es.NewClient(ctx, dsInfo, req.Queries[0].TimeRange, logger, tracer) if err != nil { return &backend.QueryDataResponse{}, err } diff --git a/pkg/tsdb/prometheus/client/client.go b/pkg/tsdb/prometheus/client/client.go index 3682f77f2ee71..52429b8afaf9b 100644 --- a/pkg/tsdb/prometheus/client/client.go +++ b/pkg/tsdb/prometheus/client/client.go @@ -3,6 +3,8 @@ package client import ( "bytes" "context" + "github.com/grafana/grafana/pkg/infra/log" + m "github.com/grafana/grafana/pkg/models" "io" "net/http" "net/url" @@ -47,8 +49,6 @@ func (c *Client) QueryRange(ctx context.Context, q *models.Query, headers map[st return nil, err } - c.addHeaders(headers, req) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support - return c.doer.Do(req) } @@ -65,8 +65,6 @@ func (c *Client) QueryInstant(ctx context.Context, q *models.Query, headers map[ return nil, err } - c.addHeaders(headers, req) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support - return c.doer.Do(req) } @@ -83,8 +81,6 @@ func (c *Client) QueryExemplars(ctx context.Context, q *models.Query, headers ma return nil, err } - c.addHeaders(headers, req) // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support - return c.doer.Do(req) } @@ -170,6 +166,21 @@ func createRequest(ctx context.Context, method string, u *url.URL, bodyReader io // It's set to nil so it is not actually sent over the wire, just used in Go http lib to retry requests. request.Header["Idempotency-Key"] = nil } + + logger := log.New(ctx) + logzHeaders := ctx.Value("logzioHeaders") + if logzHeaders != nil { + logzIoHeaders := &m.LogzIoHeaders{} + logzIoHeaders.RequestHeaders = http.Header{} + for k, v := range logzHeaders.(http.Header) { + logzIoHeaders.RequestHeaders[k] = v + } + + request.Header = logzIoHeaders.GetDatasourceQueryHeaders(request.Header) + } + + logger.Debug("created request", "headers", request.Header, "url", request.URL) + return request, nil } From 2e30bff12aad1320c02ad84036eb5c282a34312e Mon Sep 17 00:00:00 2001 From: kittycatbone Date: Thu, 23 May 2024 17:49:12 +0300 Subject: [PATCH 2/5] DEV-44293 - Support Logz Datasource * Added relevant header for m3 query ervice endpoints on prometheus client * Added on elasticsearch the relevant queryParams for querySource --- pkg/tsdb/elasticsearch/client/client.go | 24 ++++++++++++++++++------ pkg/tsdb/prometheus/client/client.go | 3 +++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 8033c2c848f26..6864e7b892edf 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -175,7 +175,7 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ req.Header = c.logzIoHeaders.GetDatasourceQueryHeaders(req.Header) // LOGZ.IO GRAFANA CHANGE :: DEV-43883 Support external alert evaluation req.Header.Set("Content-Type", "application/json") // LOGZ.IO GRAFANA CHANGE :: DEV-43744 use application/json to interact with query-service - c.logger.Debug("request headers", "headers", req.Header) + c.logger.Debug("request details", "headers", req.Header, "url", req.URL.String()) //nolint:bodyclose resp, err := c.ds.HTTPClient.Do(req) if err != nil { @@ -288,14 +288,26 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque func (c *baseClientImpl) getMultiSearchQueryParameters() string { var qs []string - // LOGZ.IO GRAFANA CHANGE :: DEV-43889 Grafana alerts evaluation - set 'accountsToSearch' query param + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 Grafana alerts evaluation - set 'accountsToSearch' and 'querySource' params + var querySourceFromLogzHeaders []string + headers := c.logzIoHeaders.RequestHeaders + if headers != nil { + querySourceFromLogzHeaders = headers["Query-Source"] + if len(querySourceFromLogzHeaders) > 0 { + qs = append(qs, fmt.Sprintf("querySource=%s", querySourceFromLogzHeaders[0])) + } + } + datasourceUrl, _ := url.Parse(c.ds.URL) q, _ := url.ParseQuery(datasourceUrl.RawQuery) - if len(q.Get("querySource")) > 0 { - // set/override 'accountsToSearch' as Database (accountId) - qs = append(qs, fmt.Sprintf("accountsToSearch=%s", c.ds.Database)) - qs = append(qs, "querySource=INTERNAL_METRICS_ALERTS") + for key, values := range q { + if key != "querySource" || len(querySourceFromLogzHeaders) == 0 { + for _, v := range values { + qs = append(qs, fmt.Sprintf("%s=%s", key, v)) + } + } } + // LOGZ.IO end maxConcurrentShardRequests := c.ds.MaxConcurrentShardRequests diff --git a/pkg/tsdb/prometheus/client/client.go b/pkg/tsdb/prometheus/client/client.go index 52429b8afaf9b..e8932df4a68bc 100644 --- a/pkg/tsdb/prometheus/client/client.go +++ b/pkg/tsdb/prometheus/client/client.go @@ -178,6 +178,9 @@ func createRequest(ctx context.Context, method string, u *url.URL, bodyReader io request.Header = logzIoHeaders.GetDatasourceQueryHeaders(request.Header) } + if request.Header.Get("Query-Source") == "" { + request.Header.Set("Query-Source", "GRAFANA") + } logger.Debug("created request", "headers", request.Header, "url", request.URL) From 266fb6b6505742412eb094f5a2c4eb9ad01e8dbd Mon Sep 17 00:00:00 2001 From: kittycatbone Date: Mon, 27 May 2024 13:17:48 +0300 Subject: [PATCH 3/5] added comment --- pkg/expr/nodes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index a3bb977a71ff3..ca773714f002e 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -5,7 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "net/http" + "net/http" // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support "strings" "time" From 1f6d27b434a2e2597376506de94c35df0f836c1d Mon Sep 17 00:00:00 2001 From: kittycatbone Date: Thu, 30 May 2024 10:43:30 +0300 Subject: [PATCH 4/5] Fix user-context header to be unescaped Fix for prometheus_test.go to work with changes Logz comments in missing places --- pkg/models/headers_logzio.go | 6 +++++- pkg/tsdb/prometheus/client/client.go | 15 ++++----------- pkg/tsdb/prometheus/prometheus_test.go | 1 + 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/models/headers_logzio.go b/pkg/models/headers_logzio.go index 4b8425a9d0894..3d02c6d2e85d8 100644 --- a/pkg/models/headers_logzio.go +++ b/pkg/models/headers_logzio.go @@ -3,6 +3,7 @@ package models import ( "net/http" + "net/url" ) type LogzIoHeaders struct { @@ -22,7 +23,10 @@ func (logzioHeaders *LogzIoHeaders) GetDatasourceQueryHeaders(grafanaGeneratedHe for _, whitelistedHeader := range logzioHeadersWhitelist { if requestHeader := logzioGrafanaRequestHeaders.Get(whitelistedHeader); requestHeader != "" { if whitelistedHeader == "X-Logz-Query-Context" { - datasourceRequestHeaders.Set("User-Context", requestHeader) + unescapedHeader, err := url.PathUnescape(requestHeader) + if err == nil { + datasourceRequestHeaders.Set("User-Context", unescapedHeader) + } } else { datasourceRequestHeaders.Set(whitelistedHeader, requestHeader) } diff --git a/pkg/tsdb/prometheus/client/client.go b/pkg/tsdb/prometheus/client/client.go index e8932df4a68bc..0e2fffd65c7ca 100644 --- a/pkg/tsdb/prometheus/client/client.go +++ b/pkg/tsdb/prometheus/client/client.go @@ -3,8 +3,8 @@ package client import ( "bytes" "context" - "github.com/grafana/grafana/pkg/infra/log" - m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/infra/log" // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support + m "github.com/grafana/grafana/pkg/models" // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support "io" "net/http" "net/url" @@ -167,6 +167,7 @@ func createRequest(ctx context.Context, method string, u *url.URL, bodyReader io request.Header["Idempotency-Key"] = nil } + // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support logger := log.New(ctx) logzHeaders := ctx.Value("logzioHeaders") if logzHeaders != nil { @@ -183,6 +184,7 @@ func createRequest(ctx context.Context, method string, u *url.URL, bodyReader io } logger.Debug("created request", "headers", request.Header, "url", request.URL) + // LOGZ.IO GRAFANA CHANGE :: End return request, nil } @@ -190,12 +192,3 @@ func createRequest(ctx context.Context, method string, u *url.URL, bodyReader io func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } - -// LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support -func (c *Client) addHeaders(headers map[string]string, req *http.Request) { - for k, v := range headers { - req.Header[k] = []string{v} - } -} - -// LOGZ.IO GRAFANA CHANGE :: End diff --git a/pkg/tsdb/prometheus/prometheus_test.go b/pkg/tsdb/prometheus/prometheus_test.go index 1d21212a18f61..5ceb0940211e4 100644 --- a/pkg/tsdb/prometheus/prometheus_test.go +++ b/pkg/tsdb/prometheus/prometheus_test.go @@ -104,6 +104,7 @@ func TestService(t *testing.T) { http.Header{ "Content-Type": {"application/x-www-form-urlencoded"}, "Idempotency-Key": []string(nil), + "Query-Source": {"GRAFANA"}, // LOGZ.IO GRAFANA CHANGE :: DEV-43889 - Add headers for logzio datasources support }, f.Roundtripper.Req.Header) require.Equal(t, http.MethodPost, f.Roundtripper.Req.Method) From 024b4fa2428d4bd68df861186f0d25f10f4805f2 Mon Sep 17 00:00:00 2001 From: kittycatbone Date: Thu, 30 May 2024 12:38:01 +0300 Subject: [PATCH 5/5] Fix schedule_unit_test.go --- .../ngalert/schedule/schedule_unit_test.go | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 3ffaef85e6238..0e400a51bee40 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -71,15 +71,16 @@ func TestProcessTicks(t *testing.T) { evaluator := eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheServ, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{}) schedCfg := SchedulerCfg{ - BaseInterval: cfg.BaseInterval, - C: mockedClock, - AppURL: appUrl, - EvaluatorFactory: evaluator, - RuleStore: ruleStore, - Metrics: testMetrics.GetSchedulerMetrics(), - AlertSender: notifier, - Tracer: testTracer, - Log: log.New("ngalert.scheduler"), + BaseInterval: cfg.BaseInterval, + C: mockedClock, + AppURL: appUrl, + EvaluatorFactory: evaluator, + RuleStore: ruleStore, + Metrics: testMetrics.GetSchedulerMetrics(), + AlertSender: notifier, + Tracer: testTracer, + Log: log.New("ngalert.scheduler"), + ScheduledEvalEnabled: true, // LOGZ.IO GRAFANA CHANGE :: DEV-43744 Add scheduled evaluation enabled config } managerCfg := state.ManagerCfg{ Metrics: testMetrics.GetStateMetrics(), @@ -887,16 +888,17 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor } schedCfg := SchedulerCfg{ - BaseInterval: cfg.BaseInterval, - MaxAttempts: cfg.MaxAttempts, - C: mockedClock, - AppURL: appUrl, - EvaluatorFactory: evaluator, - RuleStore: rs, - Metrics: m.GetSchedulerMetrics(), - AlertSender: senderMock, - Tracer: testTracer, - Log: log.New("ngalert.scheduler"), + BaseInterval: cfg.BaseInterval, + MaxAttempts: cfg.MaxAttempts, + C: mockedClock, + AppURL: appUrl, + EvaluatorFactory: evaluator, + RuleStore: rs, + Metrics: m.GetSchedulerMetrics(), + AlertSender: senderMock, + Tracer: testTracer, + Log: log.New("ngalert.scheduler"), + ScheduledEvalEnabled: true, // LOGZ.IO GRAFANA CHANGE :: DEV-43744 Add scheduled evaluation enabled config } managerCfg := state.ManagerCfg{ Metrics: m.GetStateMetrics(),