Skip to content

Commit 0d621ff

Browse files
authored
Add dedicated instant/range query handlers (#6763)
Signed-off-by: SungJin1212 <[email protected]>
1 parent d20600c commit 0d621ff

File tree

8 files changed

+710
-46
lines changed

8 files changed

+710
-46
lines changed

pkg/api/handlers.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/weaveworks/common/instrument"
2626
"github.com/weaveworks/common/middleware"
2727

28+
"github.com/cortexproject/cortex/pkg/api/queryapi"
2829
"github.com/cortexproject/cortex/pkg/querier"
2930
"github.com/cortexproject/cortex/pkg/querier/codec"
3031
"github.com/cortexproject/cortex/pkg/querier/stats"
@@ -195,10 +196,13 @@ func NewQuerierHandler(
195196
Help: "Current number of inflight requests to the querier.",
196197
}, []string{"method", "route"})
197198

199+
statsRenderer := querier.StatsRenderer
200+
corsOrigin := regexp.MustCompile(".*")
201+
translateSampleAndChunkQueryable := querier.NewErrorTranslateSampleAndChunkQueryable(queryable)
198202
api := v1.NewAPI(
199203
engine,
200-
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
201-
nil, // No remote write support.
204+
translateSampleAndChunkQueryable, // Translate errors to errors expected by API.
205+
nil, // No remote write support.
202206
exemplarQueryable,
203207
func(ctx context.Context) v1.ScrapePoolsRetriever { return nil },
204208
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
@@ -214,7 +218,7 @@ func NewQuerierHandler(
214218
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
215219
0, 0, 0, // Remote read samples and concurrency limit.
216220
false,
217-
regexp.MustCompile(".*"),
221+
corsOrigin,
218222
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
219223
&v1.PrometheusVersion{
220224
Version: version.Version,
@@ -229,7 +233,7 @@ func NewQuerierHandler(
229233
// This is used for the stats API which we should not support. Or find other ways to.
230234
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
231235
reg,
232-
querier.StatsRenderer,
236+
statsRenderer,
233237
false,
234238
nil,
235239
false,
@@ -240,11 +244,18 @@ func NewQuerierHandler(
240244
api.ClearCodecs()
241245
cm := codec.NewInstrumentedCodecMetrics(reg)
242246

243-
api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
244-
// Install Protobuf codec to give the option for using either.
245-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
246-
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
247-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
247+
codecs := []v1.Codec{
248+
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
249+
// Protobuf codec to give the option for using either.
250+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
251+
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
252+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
253+
}
254+
255+
// Install codecs
256+
for _, c := range codecs {
257+
api.InstallCodec(c)
258+
}
248259

249260
router := mux.NewRouter()
250261

@@ -269,13 +280,15 @@ func NewQuerierHandler(
269280
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
270281
api.Register(legacyPromRouter)
271282

283+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
284+
272285
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
273286
// https://github.com/prometheus/prometheus/pull/7125/files
274287
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
275288
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
276289
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
277-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
278-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
290+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
291+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
279292
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
280293
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
281294
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
@@ -287,8 +300,8 @@ func NewQuerierHandler(
287300
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
288301
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
289302
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
290-
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
291-
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
303+
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
304+
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
292305
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
293306
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
294307
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)

pkg/api/queryapi/query_api.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package queryapi
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/grafana/regexp"
12+
"github.com/munnerz/goautoneg"
13+
"github.com/prometheus/prometheus/promql"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/prometheus/prometheus/util/httputil"
17+
v1 "github.com/prometheus/prometheus/web/api/v1"
18+
"github.com/weaveworks/common/httpgrpc"
19+
20+
"github.com/cortexproject/cortex/pkg/util"
21+
"github.com/cortexproject/cortex/pkg/util/api"
22+
)
23+
24+
type QueryAPI struct {
25+
queryable storage.SampleAndChunkQueryable
26+
queryEngine promql.QueryEngine
27+
now func() time.Time
28+
statsRenderer v1.StatsRenderer
29+
logger log.Logger
30+
codecs []v1.Codec
31+
CORSOrigin *regexp.Regexp
32+
}
33+
34+
func NewQueryAPI(
35+
qe promql.QueryEngine,
36+
q storage.SampleAndChunkQueryable,
37+
statsRenderer v1.StatsRenderer,
38+
logger log.Logger,
39+
codecs []v1.Codec,
40+
CORSOrigin *regexp.Regexp,
41+
) *QueryAPI {
42+
return &QueryAPI{
43+
queryEngine: qe,
44+
queryable: q,
45+
statsRenderer: statsRenderer,
46+
logger: logger,
47+
codecs: codecs,
48+
CORSOrigin: CORSOrigin,
49+
now: time.Now,
50+
}
51+
}
52+
53+
func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
54+
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
55+
start, err := util.ParseTime(r.FormValue("start"))
56+
if err != nil {
57+
return invalidParamError(err, "start")
58+
}
59+
end, err := util.ParseTime(r.FormValue("end"))
60+
if err != nil {
61+
return invalidParamError(err, "end")
62+
}
63+
if end < start {
64+
return invalidParamError(ErrEndBeforeStart, "end")
65+
}
66+
67+
step, err := util.ParseDurationMs(r.FormValue("step"))
68+
if err != nil {
69+
return invalidParamError(err, "step")
70+
}
71+
72+
if step <= 0 {
73+
return invalidParamError(ErrNegativeStep, "step")
74+
}
75+
76+
// For safety, limit the number of returned points per timeseries.
77+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
78+
if (end-start)/step > 11000 {
79+
return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil}
80+
}
81+
82+
ctx := r.Context()
83+
if to := r.FormValue("timeout"); to != "" {
84+
var cancel context.CancelFunc
85+
timeout, err := util.ParseDurationMs(to)
86+
if err != nil {
87+
return invalidParamError(err, "timeout")
88+
}
89+
90+
ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
91+
defer cancel()
92+
}
93+
94+
opts, err := extractQueryOpts(r)
95+
if err != nil {
96+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
97+
}
98+
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
99+
if err != nil {
100+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
101+
}
102+
// From now on, we must only return with a finalizer in the result (to
103+
// be called by the caller) or call qry.Close ourselves (which is
104+
// required in the case of a panic).
105+
defer func() {
106+
if result.finalizer == nil {
107+
qry.Close()
108+
}
109+
}()
110+
111+
ctx = httputil.ContextFromRequest(ctx, r)
112+
113+
res := qry.Exec(ctx)
114+
if res.Err != nil {
115+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
116+
}
117+
118+
warnings := res.Warnings
119+
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
120+
121+
return apiFuncResult{&v1.QueryData{
122+
ResultType: res.Value.Type(),
123+
Result: res.Value,
124+
Stats: qs,
125+
}, nil, warnings, qry.Close}
126+
}
127+
128+
func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
129+
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
130+
ts, err := util.ParseTimeParam(r, "time", q.now().Unix())
131+
if err != nil {
132+
return invalidParamError(err, "time")
133+
}
134+
135+
ctx := r.Context()
136+
if to := r.FormValue("timeout"); to != "" {
137+
var cancel context.CancelFunc
138+
timeout, err := util.ParseDurationMs(to)
139+
if err != nil {
140+
return invalidParamError(err, "timeout")
141+
}
142+
143+
ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout)))
144+
defer cancel()
145+
}
146+
147+
opts, err := extractQueryOpts(r)
148+
if err != nil {
149+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
150+
}
151+
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
152+
if err != nil {
153+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
154+
}
155+
156+
// From now on, we must only return with a finalizer in the result (to
157+
// be called by the caller) or call qry.Close ourselves (which is
158+
// required in the case of a panic).
159+
defer func() {
160+
if result.finalizer == nil {
161+
qry.Close()
162+
}
163+
}()
164+
165+
ctx = httputil.ContextFromRequest(ctx, r)
166+
167+
res := qry.Exec(ctx)
168+
if res.Err != nil {
169+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
170+
}
171+
172+
warnings := res.Warnings
173+
qs := q.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
174+
175+
return apiFuncResult{&v1.QueryData{
176+
ResultType: res.Value.Type(),
177+
Result: res.Value,
178+
Stats: qs,
179+
}, nil, warnings, qry.Close}
180+
}
181+
182+
func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
183+
return func(w http.ResponseWriter, r *http.Request) {
184+
httputil.SetCORS(w, q.CORSOrigin, r)
185+
186+
result := f(r)
187+
if result.finalizer != nil {
188+
defer result.finalizer()
189+
}
190+
191+
if result.err != nil {
192+
api.RespondFromGRPCError(q.logger, w, result.err.err)
193+
return
194+
}
195+
196+
if result.data != nil {
197+
q.respond(w, r, result.data, result.warnings, r.FormValue("query"))
198+
return
199+
}
200+
w.WriteHeader(http.StatusNoContent)
201+
}
202+
}
203+
204+
func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
205+
warn, info := warnings.AsStrings(query, 10, 10)
206+
207+
resp := &v1.Response{
208+
Status: statusSuccess,
209+
Data: data,
210+
Warnings: warn,
211+
Infos: info,
212+
}
213+
214+
codec, err := q.negotiateCodec(req, resp)
215+
if err != nil {
216+
api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
217+
return
218+
}
219+
220+
b, err := codec.Encode(resp)
221+
if err != nil {
222+
level.Error(q.logger).Log("error marshaling response", "url", req.URL, "err", err)
223+
http.Error(w, err.Error(), http.StatusInternalServerError)
224+
return
225+
}
226+
227+
w.Header().Set("Content-Type", codec.ContentType().String())
228+
w.WriteHeader(http.StatusOK)
229+
if n, err := w.Write(b); err != nil {
230+
level.Error(q.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
231+
}
232+
}
233+
234+
func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
235+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
236+
for _, codec := range q.codecs {
237+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
238+
return codec, nil
239+
}
240+
}
241+
}
242+
243+
defaultCodec := q.codecs[0]
244+
if !defaultCodec.CanEncode(resp) {
245+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
246+
}
247+
248+
return defaultCodec, nil
249+
}

0 commit comments

Comments
 (0)