Skip to content

Commit

Permalink
Add support for Prometheus rule query offset (cortexproject#6085)
Browse files Browse the repository at this point in the history
* Add support for prometheus rule query offset

Signed-off-by: Mustafain Ali Khan <[email protected]>

* Fix tests

Signed-off-by: Mustafain Ali Khan <[email protected]>

* Use per-tenant limit for global query offset

Signed-off-by: Mustafain Ali Khan <[email protected]>

---------

Signed-off-by: Mustafain Ali Khan <[email protected]>
  • Loading branch information
mustafain117 authored Jul 30, 2024
1 parent 8ce3642 commit 85bc555
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Store Gateway: Token bucket limiter. #6016
* [FEATURE] Ruler: Add support for `query_offset` field on RuleGroup and new `ruler_query_offset` per-tenant limit. #6085
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,10 @@ query_rejection:
# CLI flag: -ruler.max-rule-groups-per-tenant
[ruler_max_rule_groups_per_tenant: <int> | default = 0]

# Duration to offset all rule evaluation queries per-tenant.
# CLI flag: -ruler.query-offset
[ruler_query_offset: <duration> | default = 0s]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down
19 changes: 16 additions & 3 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ interval: 15s
err: errors.New("invalid rules config: rule group 'rg_name' has no rules"),
},
{
name: "with a a valid rules file",
name: "with a valid rules file",
status: 202,
input: `
name: test
Expand All @@ -279,7 +279,20 @@ rules:
labels:
test: test
`,
output: "name: test\ninterval: 15s\nrules:\n - record: up_rule\n expr: up{}\n - alert: up_alert\n expr: sum(up{}) > 1\n for: 30s\n labels:\n test: test\n annotations:\n test: test\n",
output: "name: test\ninterval: 15s\nquery_offset: 0s\nrules:\n - record: up_rule\n expr: up{}\n - alert: up_alert\n expr: sum(up{}) > 1\n for: 30s\n labels:\n test: test\n annotations:\n test: test\n",
},
{
name: "with a valid rule query offset",
status: 202,
input: `
name: test
interval: 15s
query_offset: 2m
rules:
- record: up_rule
expr: up{}
`,
output: "name: test\ninterval: 15s\nquery_offset: 2m\nrules:\n - record: up_rule\n expr: up{}\n",
},
}

Expand Down Expand Up @@ -329,7 +342,7 @@ func TestRuler_DeleteNamespace(t *testing.T) {

router.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "name: group1\ninterval: 1m\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())
require.Equal(t, "name: group1\ninterval: 1m\nquery_offset: 0s\nrules:\n - record: UP_RULE\n expr: up\n - alert: UP_ALERT\n expr: up < 1\n", w.Body.String())

// Delete namespace1
req = requestFor(t, http.MethodDelete, "https://localhost:8080/api/v1/rules/namespace1", nil, "user1")
Expand Down
4 changes: 4 additions & 0 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type RulesLimits interface {
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
}

Expand Down Expand Up @@ -358,6 +359,9 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
ResendDelay: cfg.ResendDelay,
ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled,
MaxConcurrentEvals: cfg.MaxConcurrentEvals,
DefaultRuleQueryOffset: func() time.Duration {
return overrides.RulerQueryOffset(userID)
},
})
}
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,13 +911,15 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest, includeB
}
interval := group.Interval()

queryOffset := group.QueryOffset()
groupDesc := &GroupStateDesc{
Group: &rulespb.RuleGroupDesc{
Name: group.Name(),
Namespace: string(decodedNamespace),
Interval: interval,
User: userID,
Limit: int64(group.Limit()),
Name: group.Name(),
Namespace: string(decodedNamespace),
Interval: interval,
User: userID,
Limit: int64(group.Limit()),
QueryOffset: &queryOffset,
},

EvaluationTimestamp: group.GetLastEvaluation(),
Expand Down
37 changes: 37 additions & 0 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ruleLimits struct {
maxRuleGroups int
disabledRuleGroups validation.DisabledRuleGroups
maxQueryLength time.Duration
queryOffset time.Duration
}

func (r ruleLimits) EvaluationDelay(_ string) time.Duration {
Expand All @@ -112,6 +113,10 @@ func (r ruleLimits) DisabledRuleGroups(userID string) validation.DisabledRuleGro

func (r ruleLimits) MaxQueryLength(_ string) time.Duration { return r.maxQueryLength }

func (r ruleLimits) RulerQueryOffset(_ string) time.Duration {
return r.queryOffset
}

func newEmptyQueryable() storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return emptyQuerier{}, nil
Expand Down Expand Up @@ -2533,3 +2538,35 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}
}

func TestRuler_QueryOffset(t *testing.T) {
store := newMockRuleStore(mockRulesQueryOffset, nil)
cfg := defaultRulerConfig(t)

r := newTestRuler(t, cfg, store, nil)
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

ctx := user.InjectOrgID(context.Background(), "user1")
rls, err := r.Rules(ctx, &RulesRequest{})
require.NoError(t, err)
require.Len(t, rls.Groups, 1)
rg := rls.Groups[0]
expectedRg := mockRulesQueryOffset["user1"][0]
compareRuleGroupDescToStateDesc(t, expectedRg, rg)

// test default query offset=0 when not defined at group level
gotOffset := rg.GetGroup().QueryOffset
require.Equal(t, time.Duration(0), *gotOffset)

ctx = user.InjectOrgID(context.Background(), "user2")
rls, err = r.Rules(ctx, &RulesRequest{})
require.NoError(t, err)
require.Len(t, rls.Groups, 1)
rg = rls.Groups[0]
expectedRg = mockRules["user2"][0]
compareRuleGroupDescToStateDesc(t, expectedRg, rg)

// test group query offset is set
gotOffset = rg.GetGroup().QueryOffset
require.Equal(t, time.Minute*2, *gotOffset)
}
31 changes: 21 additions & 10 deletions pkg/ruler/rulespb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ import (

// ToProto transforms a formatted prometheus rulegroup to a rule group protobuf
func ToProto(user string, namespace string, rl rulefmt.RuleGroup) *RuleGroupDesc {
var queryOffset *time.Duration
if rl.QueryOffset != nil {
offset := time.Duration(*rl.QueryOffset)
queryOffset = &offset
}
rg := RuleGroupDesc{
Name: rl.Name,
Namespace: namespace,
Interval: time.Duration(rl.Interval),
Rules: formattedRuleToProto(rl.Rules),
User: user,
Limit: int64(rl.Limit),
Name: rl.Name,
Namespace: namespace,
Interval: time.Duration(rl.Interval),
Rules: formattedRuleToProto(rl.Rules),
User: user,
Limit: int64(rl.Limit),
QueryOffset: queryOffset,
}
return &rg
}
Expand All @@ -43,11 +49,16 @@ func formattedRuleToProto(rls []rulefmt.RuleNode) []*RuleDesc {

// FromProto generates a rulefmt RuleGroup
func FromProto(rg *RuleGroupDesc) rulefmt.RuleGroup {
var queryOffset model.Duration
if rg.QueryOffset != nil {
queryOffset = model.Duration(*rg.QueryOffset)
}
formattedRuleGroup := rulefmt.RuleGroup{
Name: rg.GetName(),
Interval: model.Duration(rg.Interval),
Rules: make([]rulefmt.RuleNode, len(rg.GetRules())),
Limit: int(rg.Limit),
Name: rg.GetName(),
Interval: model.Duration(rg.Interval),
Rules: make([]rulefmt.RuleNode, len(rg.GetRules())),
Limit: int(rg.Limit),
QueryOffset: &queryOffset,
}

for i, rl := range rg.GetRules() {
Expand Down
10 changes: 7 additions & 3 deletions pkg/ruler/rulespb/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ func TestProto(t *testing.T) {

rules = append(rules, testRule)

queryOffset := model.Duration(30 * time.Second)
rg := rulefmt.RuleGroup{
Name: "group1",
Rules: rules,
Interval: model.Duration(time.Minute),
Name: "group1",
Rules: rules,
Interval: model.Duration(time.Minute),
QueryOffset: &queryOffset,
}

desc := ToProto("test", "namespace", rg)

assert.Equal(t, len(rules), len(desc.Rules))
assert.Equal(t, 30*time.Second, *desc.QueryOffset)

ruleDesc := desc.Rules[0]

Expand Down
Loading

0 comments on commit 85bc555

Please sign in to comment.