Skip to content

Commit

Permalink
[querier] honor querier mint,maxt if no SelectHints are passed to Sel…
Browse files Browse the repository at this point in the history
…ect (#4413)

* [querier] honor querier mint,maxt if no SelectHints are passed to Select

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* export mockDistributor for later use in the ruler tests

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* add test for restoring FOR state in the ruler via distributors

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* test for always active alert

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* move querier/testutils to querier pkg

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* add empty chunk store to ruler test

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* add changelog entry

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* lint

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* honor mint,maxt if sp is null for other querier implementations

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

* missed addition in rebase

Signed-off-by: Abdurrahman J. Allawala <[email protected]>

Co-authored-by: Alvin Lin <[email protected]>
  • Loading branch information
aallawala and alvinlin123 authored Sep 17, 2021
1 parent 83d15b5 commit e65da18
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 113 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
* [BUGFIX] Memberlist: forward only changes, not entire original message. #4419
* [BUGFIX] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420
* [BUGFIX] Querier: fixed panic when querying exemplars and using `-distributor.shard-by-all-labels=false`. #4473
* [BUGFIX] Querier: honor querier minT,maxT if `nil` SelectHints are passed to Select(). #4413
* [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #4483


## 1.10.0 / 2021-08-03

* [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157)
Expand Down
9 changes: 7 additions & 2 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,19 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...
return storage.ErrSeriesSet(err)
}

minT, maxT := q.mint, q.maxt
if sp != nil {
minT, maxT = sp.Start, sp.End
}

// We will hit this for /series lookup when -querier.query-store-for-labels-enabled is set.
// If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded.
// That flag is only to be set with blocks storage engine, and this is a protective measure.
if sp == nil || sp.Func == "series" {
if sp != nil && sp.Func == "series" {
return storage.EmptySeriesSet()
}

chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...)
chunks, err := q.store.Get(q.ctx, userID, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,26 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
defer log.Span.Finish()

// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation,
// which needs only metadata. For this specific case we shouldn't apply the queryIngestersWithin
minT, maxT := q.mint, q.maxt
if sp != nil {
minT, maxT = sp.Start, sp.End
}

// If the querier receives a 'series' query, it means only metadata is needed.
// For this specific case we shouldn't apply the queryIngestersWithin
// time range manipulation, otherwise we'll end up returning no series at all for
// older time ranges (while in Cortex we do ignore the start/end and always return
// series in ingesters).
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp == nil || sp.Func == "series" {
if sp != nil && sp.Func == "series" {
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(ms)
}

minT, maxT := sp.Start, sp.End

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
// optimization is particularly important for the blocks storage where the blocks retention in the
Expand Down
53 changes: 9 additions & 44 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -29,7 +28,7 @@ const (
)

func TestDistributorQuerier(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
model.Matrix{
// Matrixes are unsorted, so this tests that the labels get sorted.
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
for _, streamingEnabled := range []bool{false, true} {
for testName, testData := range tests {
t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)
Expand All @@ -127,10 +126,10 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

// Select hints are not passed by Prometheus when querying /series.
// Select hints are passed by Prometheus when querying /series.
var hints *storage.SelectHints
if !testData.querySeries {
hints = &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT}
if testData.querySeries {
hints = &storage.SelectHints{Func: "series"}
}

seriesSet := querier.Select(true, hints)
Expand All @@ -149,7 +148,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
}

func TestDistributorQueryableFilter(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)

now := time.Now()
Expand All @@ -175,7 +174,7 @@ func TestIngesterStreaming(t *testing.T) {
})
require.NoError(t, err)

d := &mockDistributor{}
d := &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
Expand Down Expand Up @@ -244,7 +243,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) {
{Value: 5.5, TimestampMs: 5500},
}

d := &mockDistributor{}
d := &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
Expand Down Expand Up @@ -316,7 +315,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
{Metric: model.Metric{"job": "baz"}},
{Metric: model.Metric{"job": "baz", "foo": "boom"}},
}
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

Expand Down Expand Up @@ -350,37 +349,3 @@ func convertToChunks(t *testing.T, samples []cortexpb.Sample) []client.Chunk {

return clientChunks
}

type mockDistributor struct {
mock.Mock
}

func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(model.Matrix), args.Error(1)
}
func (m *mockDistributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.ExemplarQueryResponse), args.Error(1)
}
func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.QueryStreamResponse), args.Error(1)
}
func (m *mockDistributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, lbl model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
args := m.Called(ctx, from, to, lbl, matchers)
return args.Get(0).([]string), args.Error(1)
}
func (m *mockDistributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
args := m.Called(ctx, from, to)
return args.Get(0).([]string), args.Error(1)
}
func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
args := m.Called(ctx, from, to, matchers)
return args.Get(0).([]metric.Metric), args.Error(1)
}

func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
args := m.Called(ctx)
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
}
4 changes: 2 additions & 2 deletions pkg/querier/metadata_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestMetadataHandler_Success(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return(
[]scrape.MetricMetadata{
{Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""},
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestMetadataHandler_Success(t *testing.T) {
}

func TestMetadataHandler_Error(t *testing.T) {
d := &mockDistributor{}
d := &MockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id"))

handler := MetadataHandler(d)
Expand Down
16 changes: 9 additions & 7 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,15 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers)
}

// Kludge: Prometheus passes nil SelectHints if it is doing a 'series' operation,
// which needs only metadata. Here we expect that metadataQuerier querier will handle that.
// In Cortex it is not feasible to query entire history (with no mint/maxt), so we only ask ingesters and skip
// querying the long-term storage.
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if (sp == nil || sp.Func == "series") && !q.queryStoreForLabels {
if sp == nil {
// if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt
sp = &storage.SelectHints{Start: q.mint, End: q.maxt}
} else if sp.Func == "series" && !q.queryStoreForLabels {
// Else if the querier receives a 'series' query, it means only metadata is needed.
// Here we expect that metadataQuerier querier will handle that.
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050

// In this case, the query time range has already been validated when the querier has been
// created.
return q.metadataQuerier.Select(true, sp, matchers...)
Expand Down
34 changes: 14 additions & 20 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestQuerier(t *testing.T) {
chunkStore, through := makeMockChunkStore(t, chunks, encoding.e)
distributor := mockDistibutorFor(t, chunkStore, through)

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e)
distributor := &errDistributor{}

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -364,11 +364,11 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) {
t.Run(fmt.Sprintf("%s (ingester streaming enabled = %t)", name, cfg.IngesterStreaming), func(t *testing.T) {
// We don't need to query any data for this test, so an empty store is fine.
chunkStore := &emptyChunkStore{}
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

limits := defaultLimitsConfig()
limits := DefaultLimitsConfig()
limits.MaxQueryLength = model.Duration(maxQueryLength)
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.IngesterStreaming = ingesterStreaming

limits := defaultLimitsConfig()
limits := DefaultLimitsConfig()
limits.MaxQueryLookback = testData.maxQueryLookback
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
Expand All @@ -570,7 +570,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}

t.Run("query range", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)

Expand Down Expand Up @@ -599,7 +599,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("series", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("label names", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand All @@ -658,7 +658,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"),
}
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand All @@ -684,7 +684,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
})

t.Run("label values", func(t *testing.T) {
distributor := &mockDistributor{}
distributor := &MockDistributor{}
distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {

// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
// so we can test everything is dedupe correctly.
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *mockDistributor {
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {
chunks, err := chunkcompat.ToChunks(cs.chunks)
require.NoError(t, err)

Expand All @@ -724,7 +724,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc
matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through)
require.NoError(t, err)

result := &mockDistributor{}
result := &MockDistributor{}
result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil)
result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil)
return result
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestShortTermQueryToLTS(t *testing.T) {
chunkStore := &emptyChunkStore{}
distributor := &errDistributor{}

overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
Expand Down Expand Up @@ -1020,9 +1020,3 @@ func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool {
m.useQueryableCalled = true
return true
}

func defaultLimitsConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
return limits
}
Loading

0 comments on commit e65da18

Please sign in to comment.