diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ee05edfb42b..bd2ae7c12de5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,10 +42,12 @@ * [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`. * [10417](https://github.com/grafana/loki/pull/10417) **jeschkies** shard `quantile_over_time` range queries using probabilistic data structures. * [11284](https://github.com/grafana/loki/pull/11284) **ashwanthgoli** Config: Adds `frontend.max-query-capacity` to tune per-tenant query capacity. +* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. * [11195](https://github.com/grafana/loki/pull/11195) **canuteson** Generate tsdb_shipper storage_config even if using_boltdb_shipper is false +* [11551](https://github.com/grafana/loki/pull/11551) **dannykopping** Do not reflect label names in request metrics' "route" label. ##### Changes diff --git a/docs/sources/send-data/promtail/installation.md b/docs/sources/send-data/promtail/installation.md index 3b81225b5189..80fd56d783a7 100644 --- a/docs/sources/send-data/promtail/installation.md +++ b/docs/sources/send-data/promtail/installation.md @@ -17,6 +17,10 @@ or there is a Helm chart to install it in a Kubernetes cluster. Every Grafana Loki release includes binaries for Promtail which can be found on the [Releases page](https://github.com/grafana/loki/releases) as part of the release assets. +## Install using APT or RPM package manager + +See the instructions [here](https://grafana.com/docs/loki//setup/install/local/#install-using-apt-or-rpm-package-manager). + ## Install using Docker ```bash diff --git a/go.mod b/go.mod index aadc1fbbca84..263256cf9983 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f github.com/grafana/go-gelf/v2 v2.0.1 - github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 + github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 diff --git a/go.sum b/go.sum index 034356232c76..b3aec233f1ef 100644 --- a/go.sum +++ b/go.sum @@ -1001,8 +1001,8 @@ github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQa github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= -github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= -github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 h1:aLBiDMjTtXx2800iCIp+8kdjIlvGX0MF/zICQMQO2qU= +github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd h1:PpuIBO5P3e9hpqBD0O/HjhShYuM6XE0i/lbE6J94kww= diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 89490d479e34..8d951ad64c94 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -207,6 +207,8 @@ func (q *query) resultLength(res promql_parser.Value) int { return r.TotalSamples() case logqlmodel.Streams: return int(r.Lines()) + case ProbabilisticQuantileMatrix: + return len(r) default: // for `scalar` or `string` or any other return type, we just return `0` as result length. return 0 @@ -361,7 +363,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries) case ProbabilisticQuantileVector: - return JoinQuantileSketchVector(next, vec, stepEvaluator) + return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params) default: return nil, fmt.Errorf("unsupported result type: %T", r) } diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 94aea83dcd90..121a3e551133 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -2,6 +2,7 @@ package logql import ( "fmt" + "math" "time" "github.com/prometheus/prometheus/model/labels" @@ -12,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/sketch" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/queue" ) const ( @@ -80,6 +82,10 @@ func (ProbabilisticQuantileMatrix) String() string { func (ProbabilisticQuantileMatrix) Type() promql_parser.ValueType { return QuantileSketchMatrixType } +func (m ProbabilisticQuantileMatrix) Release() { + quantileVectorPool.Put(m) +} + func (m ProbabilisticQuantileMatrix) ToProto() *logproto.QuantileSketchMatrix { values := make([]*logproto.QuantileSketchVector, len(m)) for i, vec := range m { @@ -157,8 +163,6 @@ func newQuantileSketchIterator( } } -//batch - type ProbabilisticQuantileSample struct { T int64 F sketch.QuantileSketch @@ -231,18 +235,26 @@ func (r *quantileSketchBatchRangeVectorIterator) agg(samples []promql.FPoint) sk return s } +// quantileVectorPool slice of ProbabilisticQuantileVector [64, 128, 256, ..., 65536] +var quantileVectorPool = queue.NewSlicePool[ProbabilisticQuantileVector](1<<6, 1<<16, 2) + // JoinQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix. -func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator) (promql_parser.Value, error) { +func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator, params Params) (promql_parser.Value, error) { vec := r.QuantileSketchVec() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } - result := make([]ProbabilisticQuantileVector, 0) + stepCount := int(math.Ceil(float64(params.End().Sub(params.Start()).Nanoseconds()) / float64(params.Step().Nanoseconds()))) + if stepCount <= 0 { + stepCount = 1 + } + + // The result is released to the pool when the matrix is serialized. + result := quantileVectorPool.Get(stepCount) for next { result = append(result, vec) - next, _, r = stepEvaluator.Next() vec = r.QuantileSketchVec() if stepEvaluator.Error() != nil { diff --git a/pkg/logql/quantile_over_time_sketch_test.go b/pkg/logql/quantile_over_time_sketch_test.go index 9a9ff1b603eb..33ecd2dc5abd 100644 --- a/pkg/logql/quantile_over_time_sketch_test.go +++ b/pkg/logql/quantile_over_time_sketch_test.go @@ -2,7 +2,10 @@ package logql import ( "errors" + "fmt" + "math/rand" "testing" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -65,7 +68,7 @@ func TestJoinQuantileSketchVectorError(t *testing.T) { ev := errorStepEvaluator{ err: errors.New("could not evaluate"), } - _, err := JoinQuantileSketchVector(true, result, ev) + _, err := JoinQuantileSketchVector(true, result, ev, LiteralParams{}) require.ErrorContains(t, err, "could not evaluate") } @@ -107,3 +110,81 @@ func (e errorStepEvaluator) Error() error { } func (e errorStepEvaluator) Explain(Node) {} + +func BenchmarkJoinQuantileSketchVector(b *testing.B) { + results := make([]ProbabilisticQuantileVector, 100) + for i := range results { + results[i] = make(ProbabilisticQuantileVector, 10) + for j := range results[i] { + results[i][j] = ProbabilisticQuantileSample{ + T: int64(i), + F: newRandomSketch(), + Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}}, + } + } + } + + ev := &sliceStepEvaluator{ + slice: results, + cur: 1, + } + + // (end - start) / step == len(results) + params := LiteralParams{ + start: time.Unix(0, 0), + end: time.Unix(int64(len(results)), 0), + step: time.Second, + } + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + // Reset step evaluator + ev.cur = 1 + r, err := JoinQuantileSketchVector(true, results[0], ev, params) + require.NoError(b, err) + r.(ProbabilisticQuantileMatrix).Release() + } +} + +func newRandomSketch() sketch.QuantileSketch { + r := rand.New(rand.NewSource(42)) + s := sketch.NewDDSketch() + for i := 0; i < 1000; i++ { + _ = s.Add(r.Float64()) + } + return s +} + +type sliceStepEvaluator struct { + err error + slice []ProbabilisticQuantileVector + cur int +} + +// Close implements StepEvaluator. +func (*sliceStepEvaluator) Close() error { + return nil +} + +// Error implements StepEvaluator. +func (ev *sliceStepEvaluator) Error() error { + return ev.err +} + +// Explain implements StepEvaluator. +func (*sliceStepEvaluator) Explain(Node) {} + +// Next implements StepEvaluator. +func (ev *sliceStepEvaluator) Next() (ok bool, ts int64, r StepResult) { + if ev.cur >= len(ev.slice) { + return false, 0, nil + } + + r = ev.slice[ev.cur] + ts = ev.slice[ev.cur][0].T + ev.cur++ + ok = ev.cur < len(ev.slice) + return +} diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 5e4a322418a3..d5a324779bf2 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -799,7 +799,12 @@ func (c Codec) Path(r queryrangebase.Request) string { case *LokiSeriesRequest: return "loki/api/v1/series" case *LabelRequest: - return request.Path() // NOTE: this could be either /label or /label/{name}/values endpoint. So forward the original path as it is. + if request.Values { + // This request contains user-generated input in the URL, which is not safe to reflect in the route path. + return "loki/api/v1/label/values" + } + + return request.Path() case *LokiInstantRequest: return "/loki/api/v1/query" case *logproto.IndexStatsRequest: diff --git a/pkg/querier/queryrange/marshal.go b/pkg/querier/queryrange/marshal.go index 6f6e99786536..994b8d682c6c 100644 --- a/pkg/querier/queryrange/marshal.go +++ b/pkg/querier/queryrange/marshal.go @@ -121,7 +121,9 @@ func ResultToResponse(result logqlmodel.Result, params logql.Params) (queryrange sk, err := data.ToProto() return &TopKSketchesResponse{Response: sk}, err case logql.ProbabilisticQuantileMatrix: - return &QuantileSketchResponse{Response: data.ToProto()}, nil + r := data.ToProto() + data.Release() + return &QuantileSketchResponse{Response: r}, nil } return nil, fmt.Errorf("unsupported data type: %T", result.Data) diff --git a/vendor/github.com/grafana/gomemcache/memcache/memcache.go b/vendor/github.com/grafana/gomemcache/memcache/memcache.go index 67288a12fb78..c5962d092e0f 100644 --- a/vendor/github.com/grafana/gomemcache/memcache/memcache.go +++ b/vendor/github.com/grafana/gomemcache/memcache/memcache.go @@ -504,30 +504,31 @@ func (c *Client) withKeyAddr(key string, fn func(net.Addr) error) (err error) { return fn(addr) } -func (c *Client) withAddrRw(addr net.Addr, fn func(*bufio.ReadWriter) error) (err error) { +func (c *Client) withAddrRw(addr net.Addr, fn func(*conn) error) (err error) { cn, err := c.getConn(addr) if err != nil { return err } defer cn.condRelease(&err) - return fn(cn.rw) + return fn(cn) } -func (c *Client) withKeyRw(key string, fn func(*bufio.ReadWriter) error) error { +func (c *Client) withKeyRw(key string, fn func(*conn) error) error { return c.withKeyAddr(key, func(addr net.Addr) error { return c.withAddrRw(addr, fn) }) } func (c *Client) getFromAddr(addr net.Addr, keys []string, opts *Options, cb func(*Item)) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil { return err } if err := rw.Flush(); err != nil { return err } - if err := c.parseGetResponse(rw.Reader, opts, cb); err != nil { + if err := c.parseGetResponse(rw.Reader, conn, opts, cb); err != nil { return err } return nil @@ -536,7 +537,8 @@ func (c *Client) getFromAddr(addr net.Addr, keys []string, opts *Options, cb fun // flushAllFromAddr send the flush_all command to the given addr func (c *Client) flushAllFromAddr(addr net.Addr) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "flush_all\r\n"); err != nil { return err } @@ -559,7 +561,8 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error { // ping sends the version command to the given addr func (c *Client) ping(addr net.Addr) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil { return err } @@ -582,7 +585,8 @@ func (c *Client) ping(addr net.Addr) error { } func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error { - return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + return c.withAddrRw(addr, func(conn *conn) error { + rw := conn.rw for _, key := range keys { if _, err := fmt.Fprintf(rw, "touch %s %d\r\n", key, expiration); err != nil { return err @@ -653,9 +657,12 @@ func (c *Client) GetMulti(keys []string, opts ...Option) (map[string]*Item, erro // parseGetResponse reads a GET response from r and calls cb for each // read and allocated Item -func (c *Client) parseGetResponse(r *bufio.Reader, opts *Options, cb func(*Item)) error { +func (c *Client) parseGetResponse(r *bufio.Reader, conn *conn, opts *Options, cb func(*Item)) error { for { + // extend deadline before each additional call, otherwise all cumulative calls use the same overall deadline + conn.extendDeadline() line, err := r.ReadSlice('\n') + if err != nil { return err } @@ -852,15 +859,15 @@ func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...in // Delete deletes the item with the provided key. The error ErrCacheMiss is // returned if the item didn't already exist in the cache. func (c *Client) Delete(key string) error { - return c.withKeyRw(key, func(rw *bufio.ReadWriter) error { - return writeExpectf(rw, resultDeleted, "delete %s\r\n", key) + return c.withKeyRw(key, func(conn *conn) error { + return writeExpectf(conn.rw, resultDeleted, "delete %s\r\n", key) }) } // DeleteAll deletes all items in the cache. func (c *Client) DeleteAll() error { - return c.withKeyRw("", func(rw *bufio.ReadWriter) error { - return writeExpectf(rw, resultDeleted, "flush_all\r\n") + return c.withKeyRw("", func(conn *conn) error { + return writeExpectf(conn.rw, resultDeleted, "flush_all\r\n") }) } @@ -891,7 +898,8 @@ func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) { var val uint64 - err := c.withKeyRw(key, func(rw *bufio.ReadWriter) error { + err := c.withKeyRw(key, func(conn *conn) error { + rw := conn.rw line, err := writeReadLine(rw, "%s %s %d\r\n", verb, key, delta) if err != nil { return err diff --git a/vendor/modules.txt b/vendor/modules.txt index 23e4a1802fb2..5c67fbbcacb1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -900,7 +900,7 @@ github.com/grafana/dskit/user # github.com/grafana/go-gelf/v2 v2.0.1 ## explicit; go 1.17 github.com/grafana/go-gelf/v2/gelf -# github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 +# github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 ## explicit; go 1.18 github.com/grafana/gomemcache/memcache # github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 => ./pkg/push