Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(level_detection): Make log level detection configurable #14784

Merged
merged 11 commits into from
Nov 6, 2024
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3336,6 +3336,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = true]

# Field name to use for log levels. If not set, log level would be detected
# based on pre-defined labels as mentioned above.
# CLI flag: -validation.log-level-fields
[log_level_fields: <list of strings> | default = [level LEVEL Level Severity severity SEVERITY lvl LVL Lvl]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to suggestions if there's a better name


# When true an ingester takes into account only the streams that it owns
# according to the ring while applying the stream limit.
# CLI flag: -ingester.use-owned-stream-count
Expand Down
167 changes: 5 additions & 162 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package distributor

import (
"bytes"
"context"
"flag"
"fmt"
Expand All @@ -12,16 +11,12 @@ import (
"strings"
"sync"
"time"
"unicode"
"unsafe"

"github.com/buger/jsonparser"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/prometheus/prometheus/model/labels"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc/codes"

"github.com/grafana/dskit/httpgrpc"
Expand Down Expand Up @@ -50,7 +45,6 @@ import (
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util"
Expand All @@ -71,12 +65,6 @@ var (
rfStats = analytics.NewInt("distributor_replication_factor")
)

var allowedLabelsForLevel = map[string]struct{}{
"level": {}, "LEVEL": {}, "Level": {},
"severity": {}, "SEVERITY": {}, "Severity": {},
"lvl": {}, "LVL": {}, "Lvl": {},
}

// Config for a Distributor.
type Config struct {
// Distributors ring
Expand Down Expand Up @@ -447,6 +435,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

var validationErrors util.GroupedErrors
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)
levelDetector := newLevelDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()

func() {
sp := opentracing.SpanFromContext(ctx)
Expand Down Expand Up @@ -480,8 +470,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
prevTs := stream.Entries[0].Timestamp

shouldDiscoverLevels := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels
levelFromLabel, hasLevelLabel := hasAnyLevelLabels(lbs)
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -491,19 +479,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
if shouldDiscoverLevels {
var logLevel string
if hasLevelLabel {
logLevel = levelFromLabel
} else if levelFromMetadata, ok := hasAnyLevelLabels(structuredMetadata); ok {
logLevel = levelFromMetadata
} else {
logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata)
}
if logLevel != "" {
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: constants.LevelLabel,
Value: logLevel,
})
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
}
stream.Entries[n] = entry
Expand Down Expand Up @@ -712,15 +690,6 @@ func (d *Distributor) trackDiscardedData(
}
}

func hasAnyLevelLabels(l labels.Labels) (string, bool) {
for lbl := range allowedLabelsForLevel {
if l.Has(lbl) {
return l.Get(lbl), true
}
}
return "", false
}

// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
Expand Down Expand Up @@ -1106,129 +1075,3 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
// otlp logs have a severity number, using which we are defining the log levels.
// Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt)
if err != nil {
return constants.LogLevelInfo
}
if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) {
return constants.LogLevelUnknown
} else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) {
return constants.LogLevelTrace
} else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
return constants.LogLevelDebug
} else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) {
return constants.LogLevelInfo
} else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) {
return constants.LogLevelWarn
} else if otlpSeverityNumber <= int(plog.SeverityNumberError4) {
return constants.LogLevelError
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return constants.LogLevelFatal
}
return constants.LogLevelUnknown
}

return extractLogLevelFromLogLine(entry.Line)
}

func extractLogLevelFromLogLine(log string) string {
logSlice := unsafe.Slice(unsafe.StringData(log), len(log))
var v []byte
if isJSON(log) {
v = getValueUsingJSONParser(logSlice)
} else {
v = getValueUsingLogfmtParser(logSlice)
}

switch {
case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")):
return constants.LogLevelTrace
case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")):
return constants.LogLevelDebug
case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")):
return constants.LogLevelInfo
case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")), bytes.EqualFold(v, []byte("warning")):
return constants.LogLevelWarn
case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")):
return constants.LogLevelError
case bytes.EqualFold(v, []byte("critical")):
return constants.LogLevelCritical
case bytes.EqualFold(v, []byte("fatal")):
return constants.LogLevelFatal
default:
return detectLevelFromLogLine(log)
}
}

func getValueUsingLogfmtParser(line []byte) []byte {
equalIndex := bytes.Index(line, []byte("="))
if len(line) == 0 || equalIndex == -1 {
return nil
}

d := logfmt.NewDecoder(line)
for !d.EOL() && d.ScanKeyval() {
if _, ok := allowedLabelsForLevel[string(d.Key())]; ok {
return (d.Value())
}
}
return nil
}

func getValueUsingJSONParser(log []byte) []byte {
for allowedLabel := range allowedLabelsForLevel {
l, _, _, err := jsonparser.Get(log, allowedLabel)
if err == nil {
return l
}
}
return nil
}

func isJSON(line string) bool {
var firstNonSpaceChar rune
for _, char := range line {
if !unicode.IsSpace(char) {
firstNonSpaceChar = char
break
}
}

var lastNonSpaceChar rune
for i := len(line) - 1; i >= 0; i-- {
char := rune(line[i])
if !unicode.IsSpace(char) {
lastNonSpaceChar = char
break
}
}

return firstNonSpaceChar == '{' && lastNonSpaceChar == '}'
}

func detectLevelFromLogLine(log string) string {
if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") ||
strings.Contains(log, "info") || strings.Contains(log, "INFO") {
return constants.LogLevelInfo
}
if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") ||
strings.Contains(log, "error") || strings.Contains(log, "ERROR") {
return constants.LogLevelError
}
if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") ||
strings.Contains(log, "warning") || strings.Contains(log, "WARNING") {
return constants.LogLevelWarn
}
if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") {
return constants.LogLevelCritical
}
if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return constants.LogLevelDebug
}
return constants.LogLevelUnknown
}
Loading
Loading