diff --git a/CHANGELOG.md b/CHANGELOG.md index ba472c79dc2..1aa7d663f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 +* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 * [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145 * [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189 diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 75a55de735e..cc525776491 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -383,7 +383,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { if userID == 0 && cfg.queryStatsEnabled { res, _, err := c.QueryRaw("{instance=~\"hello.*\"}") require.NoError(t, err) - require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0]) + require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*, samples_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } // Beyond the range of -querier.query-ingesters-within should return nothing. No need to repeat it for each user. diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 798f4f0e20d..9b9f0800de4 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -281,7 +281,7 @@ func NewQuerierHandler( // This is used for the stats API which we should not support. Or find other ways to. prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }), reg, - nil, + querier.StatsRenderer, remoteWriteEnabled, nil, otlpEnabled, diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 1238a4c5382..ca210588510 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -332,6 +332,7 @@ func (f *Handler) reportQueryStats( "estimated_series_count", stats.GetEstimatedSeriesCount(), "queue_time_seconds", stats.LoadQueueTime().Seconds(), "encode_time_seconds", stats.LoadEncodeTime().Seconds(), + "samples_processed", stats.LoadSamplesProcessed(), }, formatQueryString(details, queryString)...) if details != nil { @@ -485,6 +486,7 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime())) parts = append(parts, statsValue("response_time", queryResponseTime)) parts = append(parts, statsValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes())) + parts = append(parts, statsValue("samples_processed", stats.GetSamplesProcessed())) headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", ")) } } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 5d8b02d03a8..c3338e1d07b 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -304,6 +304,7 @@ func TestHandler_ServeHTTP(t *testing.T) { expectedReadConsistency: "", assertHeaders: func(t *testing.T, headers http.Header) { assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed;val=0") + assert.Contains(t, headers.Get(ServiceTimingHeaderName), "samples_processed;val=0") }, }, } { diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index 0f74bdfdede..0d3eb1c0649 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -54,6 +54,10 @@ type mockQuerier struct { selectFn func(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet } +func (m mockQuerier) Close() error { + return nil +} + func (m mockQuerier) Select(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { if m.selectFn != nil { return m.selectFn(ctx, sorted, hints, matchers...) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index e3e74b7ac21..24587c71a8a 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -203,6 +203,22 @@ func (s *Stats) LoadEncodeTime() time.Duration { return time.Duration(atomic.LoadInt64((*int64)(&s.EncodeTime))) } +func (s *Stats) AddSamplesProcessed(c uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.SamplesProcessed, c) +} + +func (s *Stats) LoadSamplesProcessed() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.SamplesProcessed) +} + // Merge the provided Stats into this one. func (s *Stats) Merge(other *Stats) { if s == nil || other == nil { @@ -219,6 +235,7 @@ func (s *Stats) Merge(other *Stats) { s.AddEstimatedSeriesCount(other.LoadEstimatedSeriesCount()) s.AddQueueTime(other.LoadQueueTime()) s.AddEncodeTime(other.LoadEncodeTime()) + s.AddSamplesProcessed(other.LoadSamplesProcessed()) } // Copy returns a copy of the stats. Use this rather than regular struct assignment diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 84c43500e4e..60c208a2a4a 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -50,6 +50,8 @@ type Stats struct { QueueTime time.Duration `protobuf:"bytes,9,opt,name=queue_time,json=queueTime,proto3,stdduration" json:"queue_time"` // The time spent at the frontend encoding the query's final results. Does not include time spent serializing results at the querier. EncodeTime time.Duration `protobuf:"bytes,10,opt,name=encode_time,json=encodeTime,proto3,stdduration" json:"encode_time"` + // TotalSamples represents the total number of samples scanned while evaluating a query. + SamplesProcessed uint64 `protobuf:"varint,11,opt,name=samples_processed,json=samplesProcessed,proto3" json:"samples_processed,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -154,6 +156,13 @@ func (m *Stats) GetEncodeTime() time.Duration { return 0 } +func (m *Stats) GetSamplesProcessed() uint64 { + if m != nil { + return m.SamplesProcessed + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") } @@ -161,32 +170,34 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 394 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xb1, 0x4e, 0xc2, 0x40, - 0x18, 0xc7, 0x7b, 0x0a, 0x08, 0x87, 0x68, 0xac, 0xc4, 0x54, 0x86, 0x83, 0xe8, 0x20, 0x53, 0x31, - 0xea, 0xe6, 0x62, 0x0a, 0x8b, 0xa3, 0xe0, 0xe4, 0xd2, 0x94, 0xf6, 0x28, 0x8d, 0xa5, 0x07, 0xbd, - 0x6b, 0xd4, 0xcd, 0x47, 0x70, 0x34, 0xf1, 0x05, 0x7c, 0x14, 0x46, 0x46, 0x26, 0x95, 0xb2, 0x38, - 0xf2, 0x08, 0xe6, 0xee, 0x5a, 0x02, 0x4e, 0x6c, 0xbd, 0xef, 0xf7, 0xfd, 0xee, 0xff, 0xe5, 0xbe, - 0xc2, 0x22, 0x65, 0x16, 0xa3, 0xfa, 0x30, 0x24, 0x8c, 0xa8, 0x59, 0x71, 0xa8, 0x94, 0x5d, 0xe2, - 0x12, 0x51, 0x69, 0xf0, 0x2f, 0x09, 0x2b, 0xc8, 0x25, 0xc4, 0xf5, 0x71, 0x43, 0x9c, 0xba, 0x51, - 0xaf, 0xe1, 0x44, 0xa1, 0xc5, 0x3c, 0x12, 0x48, 0x7e, 0xf2, 0x91, 0x81, 0xd9, 0x0e, 0xf7, 0xd5, - 0x1b, 0x58, 0x78, 0xb2, 0x7c, 0xdf, 0x64, 0xde, 0x00, 0x6b, 0xa0, 0x06, 0xea, 0xc5, 0x8b, 0x63, - 0x5d, 0xda, 0x7a, 0x6a, 0xeb, 0xad, 0xc4, 0x36, 0xf2, 0xe3, 0xaf, 0xaa, 0xf2, 0xfe, 0x5d, 0x05, - 0xed, 0x3c, 0xb7, 0xee, 0xbd, 0x01, 0x56, 0xcf, 0x61, 0xb9, 0x87, 0x99, 0xdd, 0xc7, 0x8e, 0x49, - 0x71, 0xe8, 0x61, 0x6a, 0xda, 0x24, 0x0a, 0x98, 0xb6, 0x55, 0x03, 0xf5, 0x4c, 0x5b, 0x4d, 0x58, - 0x47, 0xa0, 0x26, 0x27, 0xaa, 0x0e, 0x0f, 0x53, 0xc3, 0xee, 0x47, 0xc1, 0xa3, 0xd9, 0x7d, 0x61, - 0x98, 0x6a, 0xdb, 0x42, 0x38, 0x48, 0x50, 0x93, 0x13, 0x83, 0x83, 0xd5, 0x04, 0xd1, 0x9f, 0x26, - 0x64, 0xd6, 0x12, 0x84, 0x90, 0x24, 0x9c, 0xc1, 0x7d, 0xda, 0xb7, 0x42, 0x07, 0x3b, 0xe6, 0x28, - 0x12, 0xc9, 0x5a, 0xb6, 0x06, 0xea, 0xa5, 0xf6, 0x5e, 0x52, 0xbe, 0x93, 0x55, 0xf5, 0x14, 0x96, - 0xe8, 0xd0, 0xf7, 0xd8, 0xb2, 0x2d, 0x27, 0xda, 0x76, 0x45, 0x31, 0x6d, 0x5a, 0x99, 0xd7, 0x0b, - 0x1c, 0xfc, 0x9c, 0xcc, 0xbb, 0xb3, 0x36, 0xef, 0x2d, 0x27, 0x72, 0xde, 0x2b, 0x78, 0x84, 0x29, - 0xf3, 0x06, 0x16, 0xfb, 0xff, 0x26, 0x79, 0xa1, 0x94, 0x97, 0x74, 0xf5, 0x55, 0x0c, 0x08, 0x47, - 0x11, 0x8e, 0xb0, 0x5c, 0x45, 0x61, 0xf3, 0x55, 0x14, 0x84, 0x26, 0x76, 0xd1, 0x82, 0x45, 0x1c, - 0xd8, 0xc4, 0x49, 0x2e, 0x81, 0x9b, 0x5f, 0x02, 0xa5, 0xc7, 0x6f, 0x31, 0xae, 0x27, 0x33, 0xa4, - 0x4c, 0x67, 0x48, 0x59, 0xcc, 0x10, 0x78, 0x8d, 0x11, 0xf8, 0x8c, 0x11, 0x18, 0xc7, 0x08, 0x4c, - 0x62, 0x04, 0x7e, 0x62, 0x04, 0x7e, 0x63, 0xa4, 0x2c, 0x62, 0x04, 0xde, 0xe6, 0x48, 0x99, 0xcc, - 0x91, 0x32, 0x9d, 0x23, 0xe5, 0x41, 0xfe, 0x90, 0xdd, 0x9c, 0x48, 0xb9, 0xfc, 0x0b, 0x00, 0x00, - 0xff, 0xff, 0xa6, 0x97, 0xf0, 0x6e, 0xad, 0x02, 0x00, 0x00, + // 422 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xb1, 0x8e, 0xd3, 0x40, + 0x10, 0x86, 0xbd, 0x90, 0x1c, 0xc9, 0x9a, 0x03, 0xce, 0x44, 0xc8, 0x5c, 0xb1, 0x17, 0x41, 0x41, + 0x24, 0x24, 0x07, 0x01, 0x1d, 0x0d, 0xf2, 0xa5, 0xa1, 0x83, 0x84, 0x8a, 0xc6, 0x72, 0xec, 0x89, + 0x63, 0x61, 0x7b, 0x1d, 0xef, 0x5a, 0x40, 0xc7, 0x23, 0x50, 0xf2, 0x08, 0x3c, 0x4a, 0xca, 0x94, + 0xa9, 0x80, 0x38, 0x0d, 0x65, 0x1e, 0x01, 0x79, 0x76, 0x1d, 0x25, 0x57, 0xa5, 0xf3, 0xce, 0x37, + 0xdf, 0xce, 0xaf, 0x1d, 0x53, 0x53, 0x48, 0x5f, 0x0a, 0x27, 0x2f, 0xb8, 0xe4, 0x56, 0x1b, 0x0f, + 0x97, 0xbd, 0x88, 0x47, 0x1c, 0x2b, 0xc3, 0xfa, 0x4b, 0xc1, 0x4b, 0x16, 0x71, 0x1e, 0x25, 0x30, + 0xc4, 0xd3, 0xb4, 0x9c, 0x0d, 0xc3, 0xb2, 0xf0, 0x65, 0xcc, 0x33, 0xc5, 0x9f, 0x2c, 0x5b, 0xb4, + 0x3d, 0xa9, 0x7d, 0xeb, 0x2d, 0xed, 0x7e, 0xf1, 0x93, 0xc4, 0x93, 0x71, 0x0a, 0x36, 0xe9, 0x93, + 0x81, 0xf9, 0xf2, 0xb1, 0xa3, 0x6c, 0xa7, 0xb1, 0x9d, 0x91, 0xb6, 0xdd, 0xce, 0xf2, 0xf7, 0x95, + 0xf1, 0xf3, 0xcf, 0x15, 0x19, 0x77, 0x6a, 0xeb, 0x63, 0x9c, 0x82, 0xf5, 0x82, 0xf6, 0x66, 0x20, + 0x83, 0x39, 0x84, 0x9e, 0x80, 0x22, 0x06, 0xe1, 0x05, 0xbc, 0xcc, 0xa4, 0x7d, 0xab, 0x4f, 0x06, + 0xad, 0xb1, 0xa5, 0xd9, 0x04, 0xd1, 0x75, 0x4d, 0x2c, 0x87, 0x3e, 0x6c, 0x8c, 0x60, 0x5e, 0x66, + 0x9f, 0xbd, 0xe9, 0x37, 0x09, 0xc2, 0xbe, 0x8d, 0xc2, 0x85, 0x46, 0xd7, 0x35, 0x71, 0x6b, 0x70, + 0x38, 0x01, 0xfb, 0x9b, 0x09, 0xad, 0xa3, 0x09, 0x28, 0xe8, 0x09, 0xcf, 0xe8, 0x7d, 0x31, 0xf7, + 0x8b, 0x10, 0x42, 0x6f, 0x51, 0xe2, 0x64, 0xbb, 0xdd, 0x27, 0x83, 0xf3, 0xf1, 0x3d, 0x5d, 0xfe, + 0xa0, 0xaa, 0xd6, 0x53, 0x7a, 0x2e, 0xf2, 0x24, 0x96, 0xfb, 0xb6, 0x33, 0x6c, 0xbb, 0x8b, 0xc5, + 0xa6, 0xe9, 0x20, 0x6f, 0x9c, 0x85, 0xf0, 0x55, 0xe7, 0xbd, 0x73, 0x94, 0xf7, 0x5d, 0x4d, 0x54, + 0xde, 0xd7, 0xf4, 0x11, 0x08, 0x19, 0xa7, 0xbe, 0xbc, 0xf9, 0x26, 0x1d, 0x54, 0x7a, 0x7b, 0x7a, + 0xf8, 0x2a, 0x2e, 0xa5, 0x8b, 0x12, 0x4a, 0x50, 0xab, 0xe8, 0x9e, 0xbe, 0x8a, 0x2e, 0x6a, 0xb8, + 0x8b, 0x11, 0x35, 0x21, 0x0b, 0x78, 0xa8, 0x2f, 0xa1, 0xa7, 0x5f, 0x42, 0x95, 0x87, 0xb7, 0x3c, + 0xa7, 0x17, 0xc2, 0x4f, 0xf3, 0x04, 0x84, 0x97, 0x17, 0x3c, 0x00, 0x21, 0x20, 0xb4, 0x4d, 0x8c, + 0xfe, 0x40, 0x83, 0xf7, 0x4d, 0xdd, 0x7d, 0xb3, 0xda, 0x30, 0x63, 0xbd, 0x61, 0xc6, 0x6e, 0xc3, + 0xc8, 0xf7, 0x8a, 0x91, 0x5f, 0x15, 0x23, 0xcb, 0x8a, 0x91, 0x55, 0xc5, 0xc8, 0xdf, 0x8a, 0x91, + 0x7f, 0x15, 0x33, 0x76, 0x15, 0x23, 0x3f, 0xb6, 0xcc, 0x58, 0x6d, 0x99, 0xb1, 0xde, 0x32, 0xe3, + 0x93, 0xfa, 0x7b, 0xa7, 0x67, 0x18, 0xe9, 0xd5, 0xff, 0x00, 0x00, 0x00, 0xff, 0xff, 0x44, 0x02, + 0xa0, 0xdb, 0xda, 0x02, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -238,13 +249,16 @@ func (this *Stats) Equal(that interface{}) bool { if this.EncodeTime != that1.EncodeTime { return false } + if this.SamplesProcessed != that1.SamplesProcessed { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 14) + s := make([]string, 0, 15) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") @@ -256,6 +270,7 @@ func (this *Stats) GoString() string { s = append(s, "EstimatedSeriesCount: "+fmt.Sprintf("%#v", this.EstimatedSeriesCount)+",\n") s = append(s, "QueueTime: "+fmt.Sprintf("%#v", this.QueueTime)+",\n") s = append(s, "EncodeTime: "+fmt.Sprintf("%#v", this.EncodeTime)+",\n") + s = append(s, "SamplesProcessed: "+fmt.Sprintf("%#v", this.SamplesProcessed)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -287,6 +302,11 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SamplesProcessed != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.SamplesProcessed)) + i-- + dAtA[i] = 0x58 + } n1, err1 := github_com_gogo_protobuf_types.StdDurationMarshalTo(m.EncodeTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdDuration(m.EncodeTime):]) if err1 != nil { return 0, err1 @@ -393,6 +413,9 @@ func (m *Stats) Size() (n int) { n += 1 + l + sovStats(uint64(l)) l = github_com_gogo_protobuf_types.SizeOfStdDuration(m.EncodeTime) n += 1 + l + sovStats(uint64(l)) + if m.SamplesProcessed != 0 { + n += 1 + sovStats(uint64(m.SamplesProcessed)) + } return n } @@ -417,6 +440,7 @@ func (this *Stats) String() string { `EstimatedSeriesCount:` + fmt.Sprintf("%v", this.EstimatedSeriesCount) + `,`, `QueueTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.QueueTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, `EncodeTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.EncodeTime), "Duration", "durationpb.Duration", 1), `&`, ``, 1) + `,`, + `SamplesProcessed:` + fmt.Sprintf("%v", this.SamplesProcessed) + `,`, `}`, }, "") return s @@ -690,6 +714,25 @@ func (m *Stats) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SamplesProcessed", wireType) + } + m.SamplesProcessed = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SamplesProcessed |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 1bd0ecda673..35cb2c9303a 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -36,4 +36,6 @@ message Stats { google.protobuf.Duration queue_time = 9 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; // The time spent at the frontend encoding the query's final results. Does not include time spent serializing results at the querier. google.protobuf.Duration encode_time = 10 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false]; + // TotalSamples represents the total number of samples scanned while evaluating a query. + uint64 samples_processed = 11; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index f45e90187ed..79d7c8670fc 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -132,6 +132,23 @@ func TestStats_QueueTime(t *testing.T) { }) } +func TestStats_SamplesProcessed(t *testing.T) { + t.Run("add and load samples processed", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddSamplesProcessed(10) + stats.AddSamplesProcessed(20) + + assert.Equal(t, uint64(30), stats.LoadSamplesProcessed()) + }) + + t.Run("add and load samples processed nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddSamplesProcessed(10) + + assert.Equal(t, uint64(0), stats.LoadSamplesProcessed()) + }) +} + func TestStats_Merge(t *testing.T) { t.Run("merge two stats objects", func(t *testing.T) { stats1 := &Stats{} @@ -142,6 +159,7 @@ func TestStats_Merge(t *testing.T) { stats1.AddShardedQueries(20) stats1.AddSplitQueries(10) stats1.AddQueueTime(5 * time.Second) + stats1.AddSamplesProcessed(10) stats2 := &Stats{} stats2.AddWallTime(time.Second) @@ -151,6 +169,7 @@ func TestStats_Merge(t *testing.T) { stats2.AddShardedQueries(21) stats2.AddSplitQueries(11) stats2.AddQueueTime(10 * time.Second) + stats2.AddSamplesProcessed(20) stats1.Merge(stats2) @@ -161,6 +180,7 @@ func TestStats_Merge(t *testing.T) { assert.Equal(t, uint32(41), stats1.LoadShardedQueries()) assert.Equal(t, uint32(21), stats1.LoadSplitQueries()) assert.Equal(t, 15*time.Second, stats1.LoadQueueTime()) + assert.Equal(t, uint64(30), stats1.LoadSamplesProcessed()) }) t.Run("merge two nil stats objects", func(t *testing.T) { @@ -176,6 +196,7 @@ func TestStats_Merge(t *testing.T) { assert.Equal(t, uint32(0), stats1.LoadShardedQueries()) assert.Equal(t, uint32(0), stats1.LoadSplitQueries()) assert.Equal(t, time.Duration(0), stats1.LoadQueueTime()) + assert.Equal(t, uint64(0), stats1.LoadSamplesProcessed()) }) } @@ -190,6 +211,7 @@ func TestStats_Copy(t *testing.T) { FetchedIndexBytes: 7, EstimatedSeriesCount: 8, QueueTime: 9, + SamplesProcessed: 10, } s2 := s1.Copy() assert.NotSame(t, s1, s2) diff --git a/pkg/querier/stats_renderer.go b/pkg/querier/stats_renderer.go new file mode 100644 index 00000000000..e0061afbd4a --- /dev/null +++ b/pkg/querier/stats_renderer.go @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querier + +import ( + "context" + + promql_stats "github.com/prometheus/prometheus/util/stats" + prom_api "github.com/prometheus/prometheus/web/api/v1" + + "github.com/grafana/mimir/pkg/querier/stats" +) + +func StatsRenderer(ctx context.Context, s *promql_stats.Statistics, param string) promql_stats.QueryStats { + mimirStats := stats.FromContext(ctx) + if mimirStats != nil && s != nil { + mimirStats.AddSamplesProcessed(uint64(s.Samples.TotalSamples)) + } + return prom_api.DefaultStatsRenderer(ctx, s, param) +} diff --git a/pkg/querier/stats_renderer_test.go b/pkg/querier/stats_renderer_test.go new file mode 100644 index 00000000000..eb27f893343 --- /dev/null +++ b/pkg/querier/stats_renderer_test.go @@ -0,0 +1,141 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querier + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/grafana/dskit/user" + "github.com/grafana/regexp" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/promqltest" + v1 "github.com/prometheus/prometheus/web/api/v1" + "github.com/stretchr/testify/require" + + mimir_stats "github.com/grafana/mimir/pkg/querier/stats" +) + +// Mimir doesn't support Prometheus' UTF-8 metric/label name scheme yet. +func init() { + model.NameValidationScheme = model.LegacyValidation +} + +func TestStatsRenderer(t *testing.T) { + + testCases := map[string]struct { + expr string + expectedTotalSamples uint64 + }{ + "zero_series": { + expr: `zero_series{}`, + expectedTotalSamples: 0, + }, + "dense_series": { + expr: `dense_series{}`, + expectedTotalSamples: 11, + }, + "start_series": { + expr: `start_series{}`, + expectedTotalSamples: 6, + }, + "end_series": { + expr: `end_series{}`, + expectedTotalSamples: 6, + }, + "sparse_series": { + expr: `sparse_series{}`, + expectedTotalSamples: 5 + 4, + }, + } + + engine := promql.NewEngine(promql.EngineOpts{ + Logger: nil, + MaxSamples: 100, + Timeout: 5 * time.Second, + }) + + start := timestamp.Time(0) + end := start.Add(10 * time.Minute) + step := 1 * time.Minute + + storage := promqltest.LoadedStorage(t, ` + load 1m + dense_series 0 1 2 3 4 5 6 7 8 9 10 + start_series 0 1 _ _ _ _ _ _ _ _ _ + end_series _ _ _ _ _ 5 6 7 8 9 10 + sparse_series 0 _ _ _ _ _ _ 7 _ _ _ + `) + t.Cleanup(func() { storage.Close() }) + + api := v1.NewAPI( + engine, + storage, + nil, + nil, + func(context.Context) v1.ScrapePoolsRetriever { return &DummyTargetRetriever{} }, + func(context.Context) v1.TargetRetriever { return &DummyTargetRetriever{} }, + func(context.Context) v1.AlertmanagerRetriever { return &DummyAlertmanagerRetriever{} }, + func() config.Config { return config.Config{} }, + map[string]string{}, + v1.GlobalURLOptions{}, + func(f http.HandlerFunc) http.HandlerFunc { return f }, + nil, // Only needed for admin APIs. + "", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty. + false, // Disable admin APIs. + promslog.NewNopLogger(), + func(context.Context) v1.RulesRetriever { return &DummyRulesRetriever{} }, + 0, 0, 0, // Remote read samples and concurrency limit. + false, // Not an agent. + regexp.MustCompile(".*"), + func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, nil }, + &v1.PrometheusVersion{}, + nil, + nil, + prometheus.DefaultGatherer, + nil, + StatsRenderer, + false, + nil, + false, + false, + 0, + ) + promRouter := route.New().WithPrefix("/api/v1") + + api.Register(promRouter) + + runQuery := func(expr string) *mimir_stats.Stats { + rec := httptest.NewRecorder() + + req := httptest.NewRequest("GET", fmt.Sprintf("/api/v1/query_range?query=%s&start=%d&end=%d&step=%ds", expr, start.Unix(), end.Unix(), int(step.Seconds())), nil) + ctx := context.Background() + _, ctx = mimir_stats.ContextWithEmptyStats(ctx) + req = req.WithContext(user.InjectOrgID(ctx, "test org")) + + promRouter.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + return mimir_stats.FromContext(ctx) + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + stats := runQuery(tc.expr) + + require.NotNil(t, stats) + require.Equal(t, tc.expectedTotalSamples, stats.LoadSamplesProcessed()) + }) + } +}