diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index acd03b3de78..5e7f45b58d6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4506,6 +4506,17 @@ The `query_frontend_config` configures the Cortex query-frontend. # CLI flag: -frontend.enabled-ruler-query-stats [enabled_ruler_query_stats_log: | default = false] +# Active query tracker monitors active queries, and writes them to the file in +# given directory. If Cortex discovers any queries in this log during startup, +# it will log them to the log file. Setting to empty value disables active query +# tracker, which also disables -frontend.max-concurrent option. +# CLI flag: -frontend.active-query-tracker-dir +[active_query_tracker_dir: | default = ""] + +# The maximum number of concurrent queries running in query frontend. +# CLI flag: -frontend.max-concurrent +[max_concurrent: | default = 500] + # If a querier disconnects without sending notification about graceful shutdown, # the query-frontend will keep the querier in the tenant's shard until the # forget delay has passed. This feature is useful to reduce the blast radius diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index bba985ea1c0..e9bf98870a4 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -30,6 +30,7 @@ import ( util_api "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/prometheus/prometheus/promql" ) const ( @@ -81,6 +82,8 @@ type HandlerConfig struct { MaxBodySize int64 `yaml:"max_body_size"` QueryStatsEnabled bool `yaml:"query_stats_enabled"` EnabledRulerQueryStatsLog bool `yaml:"enabled_ruler_query_stats_log"` + ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"` + MaxConcurrent int `yaml:"max_concurrent"` } func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) { @@ -88,6 +91,8 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&cfg.MaxBodySize, "frontend.max-body-size", 10*1024*1024, "Max body size for downstream prometheus.") f.BoolVar(&cfg.QueryStatsEnabled, "frontend.query-stats-enabled", false, "True to enable query statistics tracking. When enabled, a message with some statistics is logged for every query.") f.BoolVar(&cfg.EnabledRulerQueryStatsLog, "frontend.enabled-ruler-query-stats", false, "If enabled, report the query stats log for queries coming from the ruler to evaluate rules. It only takes effect when '-ruler.frontend-address' is configured.") + f.StringVar(&cfg.ActiveQueryTrackerDir, "frontend.active-query-tracker-dir", "", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -frontend.max-concurrent option.") + f.IntVar(&cfg.MaxConcurrent, "frontend.max-concurrent", 500, "The maximum number of concurrent queries running in query frontend.") } // Handler accepts queries and forwards them to RoundTripper. It can log slow queries, @@ -109,6 +114,7 @@ type Handler struct { rejectedQueries *prometheus.CounterVec slowQueries *prometheus.CounterVec activeUsers *util.ActiveUsersCleanupService + activeQueryTracker promql.QueryTracker initSlowQueryMetric sync.Once reg prometheus.Registerer @@ -176,6 +182,10 @@ func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, _ = h.activeUsers.StartAsync(context.Background()) } + if cfg.ActiveQueryTrackerDir != "" { + h.activeQueryTracker = promql.NewActiveQueryTracker(cfg.ActiveQueryTrackerDir, cfg.MaxConcurrent, util_log.GoKitLogToSlog(log)) + } + return h } @@ -297,6 +307,22 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } + var trackerIndex int + if f.activeQueryTracker != nil { + path := r.URL.Path + if q := r.Form.Get("query"); q != "" { + queryStr := fmt.Sprintf("[tenant:%s] %s %s", userID, path, q) + trackerIndex, _ = f.activeQueryTracker.Insert(r.Context(), queryStr) + } else if matches := r.Form["match[]"]; len(matches) > 0 { + queryStr := fmt.Sprintf("[tenant:%s] %s %s", userID, path, strings.Join(matches, ",")) + trackerIndex, _ = f.activeQueryTracker.Insert(r.Context(), queryStr) + } else { + queryStr := fmt.Sprintf("[tenant:%s] %s", userID, path) + trackerIndex, _ = f.activeQueryTracker.Insert(r.Context(), queryStr) + } + defer f.activeQueryTracker.Delete(trackerIndex) + } + source := tripperware.GetSource(r.Header.Get("User-Agent")) // Log request if f.cfg.QueryStatsEnabled {