Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output retry exponentially #526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-redis/redis v6.15.9+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
76 changes: 76 additions & 0 deletions pipeline/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package pipeline

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
)

type RetriableBatcher struct {
outFn RetriableBatcherOutFn
backoff backoff.BackOff
batcher *Batcher
onRetryError func(err error)
}

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type BackoffOpts struct {
MinRetention time.Duration
Multiplier float64
AttemptNum uint64
}

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
boff := GetBackoff(
opts.MinRetention,
opts.Multiplier,
opts.AttemptNum,
)

batcherBackoff := &RetriableBatcher{
outFn: batcherOutFn,
backoff: boff,
onRetryError: onError,
}
batcherBackoff.setBatcher(batcherOpts)
return batcherBackoff
}

func (b *RetriableBatcher) setBatcher(batcherOpts *BatcherOptions) {
batcherOpts.OutFn = b.Out
b.batcher = NewBatcher(*batcherOpts)
}

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
b.backoff.Reset()

err := backoff.Retry(func() error {
return b.outFn(data, batch)
}, b.backoff)

if err != nil {
b.onRetryError(err)
}
}

func (b *RetriableBatcher) Start(ctx context.Context) {
b.batcher.Start(ctx)
}

func (b *RetriableBatcher) Stop() {
b.batcher.Stop()
}

func (b *RetriableBatcher) Add(event *Event) {
b.batcher.Add(event)
}

func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = minRetention
expBackoff.Multiplier = multiplier
expBackoff.RandomizationFactor = 0.5
return backoff.WithMaxRetries(expBackoff, attemptNum)
}
62 changes: 62 additions & 0 deletions pipeline/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pipeline

import (
"errors"
"testing"

"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

func TestBackoff(t *testing.T) {
errorCount := &atomic.Int32{}
errorCountBefore := errorCount.Load()

eventCount := &atomic.Int32{}
eventCountBefore := eventCount.Load()

errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
},
func(workerData *WorkerData, batch *Batch) error {
eventCount.Inc()
return nil
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)

assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count")
assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count")
}

func TestBackoffWithError(t *testing.T) {
errorCount := &atomic.Int32{}
prevValue := errorCount.Load()
errorFn := func(err error) {
errorCount.Inc()
}

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
},
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
},
BackoffOpts{AttemptNum: 3},
errorFn,
)

batcherBackoff.Out(nil, nil)
assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
}
14 changes: 7 additions & 7 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -733,9 +733,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand All @@ -759,12 +759,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_plugin:
file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
Expand Down
14 changes: 7 additions & 7 deletions plugin/output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand All @@ -98,12 +98,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_plugin:
file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
Expand Down
15 changes: 14 additions & 1 deletion plugin/output/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,14 @@ If the strict mode is enabled file.d fails (exit with code 1) in above examples.
**`retry`** *`int`* *`default=10`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code.
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>

Expand All @@ -128,6 +135,12 @@ Retention milliseconds for retry to DB.

<br>

**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponential increase of retention between retries

<br>

**`insert_timeout`** *`cfg.Duration`* *`default=10s`*

Timeout for each insert request.
Expand Down
73 changes: 54 additions & 19 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

/*{ introduction
Expand All @@ -43,7 +44,7 @@
logger *zap.Logger

config *Config
batcher *pipeline.Batcher
batcher *pipeline.RetriableBatcher
ctx context.Context
cancelFunc context.CancelFunc

Expand Down Expand Up @@ -227,9 +228,15 @@
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code.
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
// > Additional settings to the Clickhouse.
Expand All @@ -242,6 +249,11 @@
Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
Retention_ time.Duration

// > @3@4@5@6
// >
// > Multiplier for exponential increase of retention between retries
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
// > Timeout for each insert request.
Expand Down Expand Up @@ -328,9 +340,6 @@
p.registerMetrics(params.MetricCtl)
p.ctx, p.cancelFunc = context.WithCancel(context.Background())

if p.config.Retry < 1 {
p.logger.Fatal("'retry' can't be <1")
}
if p.config.Retention_ < 1 {
p.logger.Fatal("'retention' can't be <1")
}
Expand Down Expand Up @@ -405,17 +414,42 @@
}
}

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
})
}

backoffOpts := pipeline.BackoffOpts{
MinRetention: p.config.Retention_,
Multiplier: float64(p.config.RetentionExponentMultiplier),
AttemptNum: uint64(p.config.Retry),
}

onError := func(err error) {
var level zapcore.Level
if p.config.FatalOnFailedInsert {
level = zapcore.FatalLevel
} else {
level = zapcore.ErrorLevel
}

Check warning on line 440 in plugin/output/clickhouse/clickhouse.go

View check run for this annotation

Codecov / codecov/patch

plugin/output/clickhouse/clickhouse.go#L435-L440

Added lines #L435 - L440 were not covered by tests

p.logger.Log(level, "can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))

Check warning on line 444 in plugin/output/clickhouse/clickhouse.go

View check run for this annotation

Codecov / codecov/patch

plugin/output/clickhouse/clickhouse.go#L442-L444

Added lines #L442 - L444 were not covered by tests
}

p.batcher = pipeline.NewRetriableBatcher(
&batcherOpts,
p.out,
backoffOpts,
onError,
)

p.batcher.Start(p.ctx)
}
Expand Down Expand Up @@ -443,7 +477,7 @@
}
}

func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
Expand Down Expand Up @@ -484,22 +518,23 @@
})

var err error
for try := 0; try < p.config.Retry; try++ {
for i := range p.instances {
requestID := p.requestID.Inc()
clickhouse := p.getInstance(requestID, try)
err = p.do(clickhouse, data.input)
clickhouse := p.getInstance(requestID, i)
err := p.do(clickhouse, data.input)
if err == nil {
break
return nil
}
p.insertErrorsMetric.Inc()
time.Sleep(p.config.Retention_)
p.logger.Error("an attempt to insert a batch failed", zap.Error(err))
}
if err != nil {
p.logger.Fatal("can't insert to the table", zap.Error(err),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
p.insertErrorsMetric.Inc()
p.logger.Error(
"an attempt to insert a batch failed",
zap.Error(err),
)

Check warning on line 534 in plugin/output/clickhouse/clickhouse.go

View check run for this annotation

Codecov / codecov/patch

plugin/output/clickhouse/clickhouse.go#L530-L534

Added lines #L530 - L534 were not covered by tests
}

return err

Check warning on line 537 in plugin/output/clickhouse/clickhouse.go

View check run for this annotation

Codecov / codecov/patch

plugin/output/clickhouse/clickhouse.go#L537

Added line #L537 was not covered by tests
}

func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error {
Expand Down
Loading
Loading