Skip to content

Commit

Permalink
BatcherBackoff
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Jan 19, 2024
1 parent da1908c commit 3219f31
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 199 deletions.
35 changes: 35 additions & 0 deletions pipeline/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pipeline

import (
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
)

type BatcherBackoff struct {
outFn BatcherOutFn
backoff backoff.BackOff
onRetryError func(err error)
}

type BackoffOpts struct {
MinRetention time.Duration
Multiplier float64
AttemptNum uint64
}

func NewBatcherBackoff(batcherOutFn BatcherOutFn, opts BackoffOpts, onRetryError func(err error)) *BatcherBackoff {
boff := cfg.GetBackoff(opts.MinRetention, opts.Multiplier, opts.AttemptNum)
return &BatcherBackoff{outFn: batcherOutFn, backoff: boff, onRetryError: onRetryError}
}

func (b *BatcherBackoff) Out(data *WorkerData, batch *Batch) {
b.backoff.Reset()
err := backoff.Retry(func() error {
return b.outFn(data, batch)
}, b.backoff)
if err != nil {
b.onRetryError(err)
}
}
47 changes: 47 additions & 0 deletions pipeline/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pipeline

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

func TestBackoff(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewBatcherBackoff(
func(workerData *WorkerData, batch *Batch) error {
return nil
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)
assert.Equal(t, prevValue, errorCount.Load(), "wrong error count")
}

func TestBackoffWithError(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewBatcherBackoff(
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
}
21 changes: 12 additions & 9 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -135,7 +133,7 @@ type Batcher struct {
}

type (
BatcherOutFn func(*WorkerData, *Batch, *backoff.BackOff)
BatcherOutFn func(*WorkerData, *Batch) error
BatcherMaintenanceFn func(*WorkerData)

BatcherOptions struct {
Expand All @@ -153,6 +151,7 @@ type (
Retry int
RetryRetention time.Duration
RetryRetentionExponentMultiplier int
OnRetryError func(err error)
}
)

Expand Down Expand Up @@ -202,19 +201,23 @@ func (b *Batcher) work() {

t := time.Now()
data := WorkerData(nil)
workerBackoff := cfg.GetBackoff(
b.opts.RetryRetention,
float64(b.opts.RetryRetentionExponentMultiplier),
uint64(b.opts.Retry),

workerBatcherBackoff := NewBatcherBackoff(
b.opts.OutFn,
BackoffOpts{
MinRetention: b.opts.RetryRetention,
Multiplier: float64(b.opts.RetryRetentionExponentMultiplier),
AttemptNum: uint64(b.opts.Retry),
},
b.opts.OnRetryError,
)

for batch := range b.fullBatches {
b.workersInProgress.Inc()

if batch.hasIterableEvents {
now := time.Now()
workerBackoff.Reset()
b.opts.OutFn(&data, batch, &workerBackoff)
workerBatcherBackoff.Out(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
}

Expand Down
7 changes: 4 additions & 3 deletions pipeline/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -37,12 +36,13 @@ func TestBatcher(t *testing.T) {
wg.Add(eventCount)

batchCount := &atomic.Int32{}
batcherOut := func(workerData *WorkerData, batch *Batch, workerBackoff *backoff.BackOff) {
batcherOut := func(workerData *WorkerData, batch *Batch) error {
if *workerData == nil {
*workerData = batchCount
}
counter := (*workerData).(*atomic.Int32)
counter.Inc()
return nil
}

seqIDs := make(map[SourceID]uint64)
Expand Down Expand Up @@ -108,12 +108,13 @@ func TestBatcherMaxSize(t *testing.T) {
wg.Add(eventCount)

batchCount := &atomic.Int32{}
batcherOut := func(workerData *WorkerData, batch *Batch, workerBackoff *backoff.BackOff) {
batcherOut := func(workerData *WorkerData, batch *Batch) error {
if *workerData == nil {
*workerData = batchCount
}
counter := (*workerData).(*atomic.Int32)
counter.Inc()
return nil
}

seqIDs := make(map[SourceID]uint64)
Expand Down
54 changes: 27 additions & 27 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/chpool"
"github.com/ClickHouse/ch-go/proto"
"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

/*{ introduction
Expand Down Expand Up @@ -428,6 +428,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
OnRetryError: func(err error) {
var level zapcore.Level
if p.config.FatalOnFailedInsert {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
}

p.logger.Log(level, "can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
},
})

p.batcher.Start(p.ctx)
Expand Down Expand Up @@ -456,7 +468,7 @@ func (d data) reset() {
}
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
Expand Down Expand Up @@ -496,36 +508,24 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, wor
}
})

try := 0
(*workerBackoff).Reset()
err := backoff.Retry(func() error {
var err error
for i := range p.instances {
requestID := p.requestID.Inc()
clickhouse := p.getInstance(requestID, try)
clickhouse := p.getInstance(requestID, i)
err := p.do(clickhouse, data.input)
if err != nil {
try++
p.insertErrorsMetric.WithLabelValues().Inc()
p.logger.Error(
"an attempt to insert a batch failed",
zap.Int("try", try),
zap.Error(err),
)
if err == nil {
return nil
}
return err
}, *workerBackoff)

}
if err != nil {
var errLogFunc func(msg string, fields ...zap.Field)
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
p.insertErrorsMetric.WithLabelValues().Inc()
p.logger.Error(
"an attempt to insert a batch failed",
zap.Error(err),
)
}

return err
}

func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error {
Expand Down
39 changes: 18 additions & 21 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/valyala/fasthttp"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

/*{ introduction
Expand Down Expand Up @@ -242,6 +242,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
Retry: p.config.Retry,
RetryRetention: p.config.Retention_,
RetryRetentionExponentMultiplier: p.config.RetentionExponentMultiplier,
OnRetryError: func(err error) {
var level zapcore.Level
if p.config.FatalOnFailedInsert {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
}

p.logger.Log(level, "can't send to the elastic", zap.Error(err),
zap.Int("retries", p.config.Retry),
)
},
})

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -264,7 +276,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors").WithLabelValues()
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -282,27 +294,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, wor
data.outBuf = p.appendEvent(data.outBuf, event)
})

err := backoff.Retry(func() error {
err := p.send(data.outBuf)
if err != nil {
p.sendErrorMetric.WithLabelValues().Inc()
p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
}
return err
}, *workerBackoff)

err := p.send(data.outBuf)
if err != nil {
var errLogFunc func(msg string, fields ...zap.Field)
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't send to the elastic", zap.Error(err),
zap.Int("retries", p.config.Retry),
)
p.sendErrorMetric.WithLabelValues().Inc()
p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
}
return err
}

func (p *Plugin) send(body []byte) error {
Expand Down
5 changes: 3 additions & 2 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v3"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
Expand Down Expand Up @@ -182,7 +181,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, workerBackoff *backoff.BackOff) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
Expand All @@ -204,6 +203,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch, wor
data.outBuf = outBuf

p.write(outBuf)

return nil
}

func (p *Plugin) fileSealUpTicker(ctx context.Context) {
Expand Down
Loading

0 comments on commit 3219f31

Please sign in to comment.