Skip to content

Commit

Permalink
Add metrics.go log line when reading from an ingester (#11571)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

Add a metrics.go log line when a querier reads from an ingester. The
output is like this:

```
level=info ts=2024-01-03T13:59:06.919965546Z caller=metrics.go:275 component=ingester org_id=fake traceID=0b216e7d014c5f87 latency=fast query_type=ingester_series start=2024-01-03T13:00:00Z end=2024-01-03T13:59:06.834Z start_delta=59m6.91996413s end_delta=85.964255ms length=59m6.834s duration=110.792µs status=200 query=<omitted> query_hash=2166136261 total_entries=9
```

**Checklist**
- [X] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [X] Tests updated
- [X] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Signed-off-by: Michel Hollands <[email protected]>
  • Loading branch information
MichelHollands authored Jan 5, 2024
1 parent 6897056 commit f04d0db
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 32 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

##### Enhancements

* [11363](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown
* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester
* [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown
* [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware.
* [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods.
* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging.
Expand Down
37 changes: 33 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,13 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
}

// Query the ingests for log streams matching a set of matchers.
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) (err error) {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())

start := time.Now().UTC()
var lines int32

if req.Plan == nil {
parsed, err := syntax.ParseLogSelector(req.Selector, true)
if err != nil {
Expand All @@ -885,6 +888,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
}
}

defer func() {
status := "successful"
if err != nil {
status = "failed"
}
statsCtx := stats.FromContext(ctx)
execTime := time.Since(start)
logql.RecordIngesterStreamsQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, req.Limit, lines, req.Shards,
statsCtx.Result(execTime, time.Duration(0), 0))
}()

instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -926,14 +940,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
batchLimit = -1
}

return sendBatches(ctx, it, queryServer, batchLimit)
lines, err = sendBatches(ctx, it, queryServer, batchLimit)
return err
}

// QuerySample the ingesters for series from logs matching a set of matchers.
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error {
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) (err error) {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
sp := opentracing.SpanFromContext(ctx)
start := time.Now().UTC()
var lines int32

// If the plan is empty we want all series to be returned.
if req.Plan == nil {
Expand All @@ -946,6 +963,17 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
}
}

defer func() {
status := "successful"
if err != nil {
status = "failed"
}
statsCtx := stats.FromContext(ctx)
execTime := time.Since(start)
logql.RecordIngesterSeriesQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, lines, req.Shards,
statsCtx.Result(execTime, time.Duration(0), 0))
}()

instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -984,7 +1012,8 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log

defer util.LogErrorWithContext(ctx, "closing iterator", it.Close)

return sendSampleBatches(ctx, it, queryServer)
lines, err = sendSampleBatches(ctx, it, queryServer)
return err
}

// asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`.
Expand Down
24 changes: 14 additions & 10 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,9 @@ type QuerierQueryServer interface {
Send(res *logproto.QueryResponse) error
}

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) (int32, error) {
stats := stats.FromContext(ctx)
var lines int32

// send until the limit is reached.
for limit != 0 && !isDone(ctx) {
Expand All @@ -960,7 +961,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ
}
batch, batchSize, err := iter.ReadBatch(i, fetchSize)
if err != nil {
return err
return lines, err
}

if limit > 0 {
Expand All @@ -969,46 +970,49 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ

stats.AddIngesterBatch(int64(batchSize))
batch.Stats = stats.Ingester()
lines += int32(batchSize)

if isDone(ctx) {
break
}
if err := queryServer.Send(batch); err != nil && err != context.Canceled {
return err
return lines, err
}

// We check this after sending an empty batch to make sure stats are sent
if len(batch.Streams) == 0 {
return nil
return lines, err
}

stats.Reset()
}
return nil
return lines, nil
}

func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error {
func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) (int32, error) {
var lines int32
sp := opentracing.SpanFromContext(ctx)

stats := stats.FromContext(ctx)
for !isDone(ctx) {
batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
if err != nil {
return err
return lines, err
}

stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()
lines += int32(size)
if isDone(ctx) {
break
}
if err := queryServer.Send(batch); err != nil && err != context.Canceled {
return err
return lines, err
}

// We check this after sending an empty batch to make sure stats are sent
if len(batch.Series) == 0 {
return nil
return lines, nil
}

stats.Reset()
Expand All @@ -1017,7 +1021,7 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
}
}

return nil
return lines, nil
}

func shouldConsiderStream(stream *stream, reqFrom, reqThrough time.Time) bool {
Expand Down
20 changes: 10 additions & 10 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,16 +614,16 @@ func Test_Iterator(t *testing.T) {

// assert the order is preserved.
var res *logproto.QueryResponse
require.NoError(t,
sendBatches(context.TODO(), it,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
return nil
},
),
int32(2)),
)
lines, err := sendBatches(context.TODO(), it,
fakeQueryServer(
func(qr *logproto.QueryResponse) error {
res = qr
return nil
},
),
int32(2))
require.NoError(t, err)
require.Equal(t, int32(2), lines)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream
require.Equal(t, 1, len(res.Streams[0].Entries))
Expand Down
74 changes: 67 additions & 7 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
)

const (
QueryTypeMetric = "metric"
QueryTypeFilter = "filter"
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"
QueryTypeStats = "stats"
QueryTypeVolume = "volume"
QueryTypeMetric = "metric"
QueryTypeFilter = "filter"
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"
QueryTypeIngesterStreams = "ingester_streams"
QueryTypeIngesterSeries = "ingester_series"
QueryTypeStats = "stats"
QueryTypeVolume = "volume"

latencyTypeSlow = "slow"
latencyTypeFast = "fast"
Expand Down Expand Up @@ -247,6 +249,64 @@ func PrintMatches(matches []string) string {
return strings.Join(matches, ":")
}

func RecordIngesterStreamsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, limit uint32, returnedLines int32, shards []string, stats logql_stats.Result) {
recordIngesterQueryMetrics(ctx, QueryTypeIngesterStreams, log, start, end, query, status, &limit, returnedLines, shards, stats)
}

func RecordIngesterSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, returnedLines int32, shards []string, stats logql_stats.Result) {
recordIngesterQueryMetrics(ctx, QueryTypeIngesterSeries, log, start, end, query, status, nil, returnedLines, shards, stats)
}

func recordIngesterQueryMetrics(ctx context.Context, queryType string, log log.Logger, start, end time.Time, query string, status string, limit *uint32, returnedLines int32, shards []string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
)

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

logValues := make([]interface{}, 0, 23)
logValues = append(logValues,
"latency", latencyType,
"query_type", queryType,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", util.HashedQuery(query),
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_bytes_structured_metadata", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)), " ", "", 1),
"lines_per_second", stats.Summary.LinesProcessedPerSecond,
"total_lines", stats.Summary.TotalLinesProcessed,
"post_filter_lines", stats.Summary.TotalPostFilterLines,
"total_entries", stats.Summary.TotalEntriesReturned,
"chunk_refs_fetch_time", stats.ChunkRefsFetchTime())

if limit != nil {
logValues = append(logValues,
"limit", *limit)
}
shard := extractShard(shards)
if shard != nil {
logValues = append(logValues,
"shard_num", shard.Shard,
"shard_count", shard.Of,
)
}

level.Info(logger).Log(logValues...)
}

func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
Expand Down

0 comments on commit f04d0db

Please sign in to comment.