Skip to content

Commit

Permalink
feat: add per-tenant time sharding for long out-of-order ingestion (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- authored Nov 6, 2024
1 parent 7669385 commit 0d6d68d
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 13 deletions.
12 changes: 12 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3712,6 +3712,18 @@ shard_streams:
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = true]

# Automatically shard streams by adding a __time_shard__ label, with values
# calculated from the log timestamps divided by MaxChunkAge/2. This allows the
# out-of-order ingestion of very old logs. If both flags are enabled,
# time-based sharding will happen before rate-based sharding.
# CLI flag: -shard-streams.time-sharding-enabled
[time_sharding_enabled: <boolean> | default = false]

# Logs with timestamps that are newer than this value will not be
# time-sharded.
# CLI flag: -shard-streams.time-sharding-ignore-recent
[time_sharding_ignore_recent: <duration> | default = 40m]

# Whether to log sharding streams behavior or not. Not recommended for
# production environments.
# CLI flag: -shard-streams.logging-enabled
Expand Down
137 changes: 125 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net/http"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -58,6 +59,8 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

timeShardLabel = "__time_shard__"
)

var (
Expand Down Expand Up @@ -120,6 +123,7 @@ type Distributor struct {
services.Service

cfg Config
ingesterCfg ingester.Config
logger log.Logger
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
Expand Down Expand Up @@ -175,6 +179,7 @@ type Distributor struct {
// New a distributor creates.
func New(
cfg Config,
ingesterCfg ingester.Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
Expand Down Expand Up @@ -233,6 +238,7 @@ func New(

d := &Distributor{
cfg: cfg,
ingesterCfg: ingesterCfg,
logger: logger,
clientCfg: clientCfg,
tenantConfigs: configs,
Expand Down Expand Up @@ -434,10 +440,42 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedLineCount := 0

var validationErrors util.GroupedErrors
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
levelDetector := newLevelDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
return
}
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}

maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int) {
if !shardStreamsCfg.TimeShardingEnabled {
maybeShardByRate(stream, pushSize)
return
}

ignoreRecentFrom := now.Add(-shardStreamsCfg.TimeShardingIgnoreRecent)
streamsByTime, ok := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2, ignoreRecentFrom)
if !ok {
maybeShardByRate(stream, pushSize)
return
}

for _, ts := range streamsByTime {
maybeShardByRate(ts.Stream, ts.linesTotalLen)
}
}

func() {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
Expand All @@ -446,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
sp.LogKV("event", "finished to validate request")
}()
}

for _, stream := range req.Streams {
// Return early if stream does not contain any entries
if len(stream.Entries) == 0 {
Expand Down Expand Up @@ -512,15 +551,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
} else {
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}
maybeShardStreams(stream, lbs, pushSize)
}
}()

Expand All @@ -534,8 +565,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

Expand Down Expand Up @@ -690,6 +719,90 @@ func (d *Distributor) trackDiscardedData(
}
}

type streamWithTimeShard struct {
logproto.Stream
linesTotalLen int
}

// This should shard the stream into multiple sub-streams based on the log
// timestamps, but with no new alocations for the log entries. It will sort them
// in-place in the given stream object (so it may modify it!) and reference
// sub-slices of the same stream.Entries slice.
//
// If the second result is false, it means that either there were no logs in the
// stream, or all of the logs in the stream occurred after the given value of
// ignoreLogsFrom, so there was no need to shard - the original `streams` value
// can be used. However, due to the in-place logs sorting by their timestamp, it
// might still have been reordered.
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration, ignoreLogsFrom time.Time) ([]streamWithTimeShard, bool) {
entries := stream.Entries
entriesLen := len(entries)
if entriesLen == 0 {
return nil, false
}

slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) })

// Shortcut to do no work if all of the logs are recent
if entries[0].Timestamp.After(ignoreLogsFrom) {
return nil, false
}

result := make([]streamWithTimeShard, 0, (entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)+1)
labelBuilder := labels.NewBuilder(lbls)

startIdx := 0
for startIdx < entriesLen && entries[startIdx].Timestamp.Before(ignoreLogsFrom) /* the index is changed below */ {
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen)
timeShardEnd := timeShardStart.Add(timeShardLen)

timeShardCutoff := timeShardEnd
if timeShardCutoff.After(ignoreLogsFrom) {
// If the time_sharding_ignore_recent is in the middle of this
// shard, we need to cut off the logs at that point.
timeShardCutoff = ignoreLogsFrom
}

endIdx := startIdx + 1
linesTotalLen := len(entries[startIdx].Line)
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardCutoff); endIdx++ {
linesTotalLen += len(entries[endIdx].Line)
}

shardLbls := labelBuilder.Set(timeShardLabel, fmt.Sprintf("%d_%d", timeShardStart.Unix(), timeShardEnd.Unix())).Labels()
result = append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: shardLbls.String(),
Hash: shardLbls.Hash(),
Entries: stream.Entries[startIdx:endIdx],
},
linesTotalLen: linesTotalLen,
})

startIdx = endIdx
}

if startIdx == entriesLen {
// We do not have any remaining entries
return result, true
}

// Append one last shard with all of the logs without a time shard
logsWithoutTimeShardLen := 0
for i := startIdx; i < entriesLen; i++ {
logsWithoutTimeShardLen += len(entries[i].Line)
}

return append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: stream.Labels,
Hash: stream.Hash,
Entries: stream.Entries[startIdx:entriesLen],
},
linesTotalLen: logsWithoutTimeShardLen,
}), true
}

// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
Expand Down
Loading

0 comments on commit 0d6d68d

Please sign in to comment.