Skip to content

Commit

Permalink
create backoff for every batch worker
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Dec 21, 2023
1 parent 2d3091c commit 47e3d9e
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 150 deletions.
38 changes: 25 additions & 13 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -133,21 +135,24 @@ type Batcher struct {
}

type (
BatcherOutFn func(*WorkerData, *Batch)
BatcherOutFn func(*WorkerData, *Batch, *backoff.BackOff)
BatcherMaintenanceFn func(*WorkerData)

BatcherOptions struct {
PipelineName string
OutputType string
OutFn BatcherOutFn
MaintenanceFn BatcherMaintenanceFn
Controller OutputPluginController
Workers int
BatchSizeCount int
BatchSizeBytes int
FlushTimeout time.Duration
MaintenanceInterval time.Duration
MetricCtl *metric.Ctl
PipelineName string
OutputType string
OutFn BatcherOutFn
MaintenanceFn BatcherMaintenanceFn
Controller OutputPluginController
Workers int
BatchSizeCount int
BatchSizeBytes int
FlushTimeout time.Duration
MaintenanceInterval time.Duration
MetricCtl *metric.Ctl
Retry int
RetryRetention time.Duration
RetryRetentionExponentMultiplier int
}
)

Expand Down Expand Up @@ -197,12 +202,19 @@ func (b *Batcher) work() {

t := time.Now()
data := WorkerData(nil)
workerBackoff := cfg.GetBackoff(
b.opts.RetryRetention,
float64(b.opts.RetryRetentionExponentMultiplier),
uint64(b.opts.Retry),
)

for batch := range b.fullBatches {
b.workersInProgress.Inc()

if batch.hasIterableEvents {
now := time.Now()
b.opts.OutFn(&data, batch)
workerBackoff.Reset()
b.opts.OutFn(&data, batch, &workerBackoff)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
}

Expand Down
5 changes: 3 additions & 2 deletions pipeline/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestBatcher(t *testing.T) {
wg.Add(eventCount)

batchCount := &atomic.Int32{}
batcherOut := func(workerData *WorkerData, batch *Batch) {
batcherOut := func(workerData *WorkerData, batch *Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = batchCount
}
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestBatcherMaxSize(t *testing.T) {
wg.Add(eventCount)

batchCount := &atomic.Int32{}
batcherOut := func(workerData *WorkerData, batch *Batch) {
batcherOut := func(workerData *WorkerData, batch *Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = batchCount
}
Expand Down
34 changes: 15 additions & 19 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Plugin struct {
batcher *pipeline.Batcher
ctx context.Context
cancelFunc context.CancelFunc
backoff backoff.BackOff

query string

Expand Down Expand Up @@ -416,22 +415,19 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
}

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

p.batcher.Start(p.ctx)
Expand Down Expand Up @@ -460,7 +456,7 @@ func (d data) reset() {
}
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
Expand Down Expand Up @@ -500,8 +496,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}
})

p.backoff.Reset()
try := 0
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
requestID := p.requestID.Inc()
clickhouse := p.getInstance(requestID, try)
Expand All @@ -516,7 +512,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
)
}
return err
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(msg string, fields ...zap.Field)
Expand Down
37 changes: 16 additions & 21 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type Plugin struct {
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
mu *sync.Mutex
backoff backoff.BackOff

// plugin metrics

Expand Down Expand Up @@ -229,28 +228,25 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_))
p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher.Start(ctx)
}

Expand All @@ -268,7 +264,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors").WithLabelValues()
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -286,15 +282,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = p.appendEvent(data.outBuf, event)
})

p.backoff.Reset()
err := backoff.Retry(func() error {
err := p.send(data.outBuf)
if err != nil {
p.sendErrorMetric.WithLabelValues().Inc()
p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
}
return err
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(args ...interface{})
Expand Down
3 changes: 2 additions & 1 deletion plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
Expand Down Expand Up @@ -181,7 +182,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand Down
38 changes: 17 additions & 21 deletions plugin/output/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Plugin struct {
avgEventSize int
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
backoff backoff.BackOff

// plugin metrics

Expand Down Expand Up @@ -204,12 +203,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config = config.(*Config)
p.registerMetrics(params.MetricCtl)

p.backoff = cfg.GetBackoff(
p.config.Retention_,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.config.hostField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.HostField))
p.config.shortMessageField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.ShortMessageField))
p.config.defaultShortMessageValue = strings.TrimSpace(p.config.DefaultShortMessageValue)
Expand All @@ -223,17 +216,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.levelField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.LevelField))

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: p.config.ReconnectInterval_,
MetricCtl: params.MetricCtl,
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: p.config.ReconnectInterval_,
MetricCtl: params.MetricCtl,
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
})

p.batcher.Start(context.TODO())
Expand All @@ -251,7 +247,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounter("output_gelf_send_error", "Total GELF send errors")
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -277,7 +273,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = outBuf
data.encodeBuf = encodeBuf

p.backoff.Reset()
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
if data.gelf == nil {
p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint)
Expand All @@ -303,7 +299,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}

return nil
}, p.backoff)
}, *workerBackoff)

if err != nil {
var errLogFunc func(args ...interface{})
Expand Down
Loading

0 comments on commit 47e3d9e

Please sign in to comment.