Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add exponential backoff for pg output plugin #74

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package backoff

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
prom "github.com/prometheus/client_golang/prometheus"
)

const (
ExpBackoffDefaultMultiplier = 1.2
ExpBackoffDefaultRndFactor = 0.25
)

// BackOff is a wrapper that provides retry mechanism.
type BackOff struct {
counter prom.Counter
backoff backoff.BackOff
timeout time.Duration
}

// RetriesCfg desribes retries count.
type RetriesCfg struct {
Limited bool
Limit uint64
}

// New instance of Backoff.
func New(
ctx context.Context,
counter prom.Counter,
timeout time.Duration,
retriesCfg RetriesCfg,
opts ...Option,
) *BackOff {
var expBackoff backoff.BackOff
expBackoff = backoff.WithContext(new(opts...), ctx)
if retriesCfg.Limited {
expBackoff = backoff.WithMaxRetries(expBackoff, retriesCfg.Limit)
}

return &BackOff{
counter: counter,
backoff: expBackoff,
timeout: timeout,
}
}

// RetryWithMetrics processes given lambda and increments error metric on fail.
func (b *BackOff) RetryWithMetrics(ctx context.Context, executor func(ctx context.Context) error) error {
err := backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(ctx, b.timeout)
defer cancel()

if execErr := executor(ctx); execErr != nil {
b.counter.Inc()
return execErr
}
return nil
}, b.backoff)
return err
}

// New returns exponential backoff.
func new(opts ...Option) *backoff.ExponentialBackOff {
backoff := backoff.NewExponentialBackOff()
for _, opt := range opts {
fmt.Println("op", opt)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
opt(backoff)
}
fmt.Println(backoff)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
return backoff
}

// Option confugures backoff.
type Option func(*backoff.ExponentialBackOff)

// InitialIntervalOpt set interval.
func InitialIntervalOpt(initInterval time.Duration) Option {
return func(expBackoff *backoff.ExponentialBackOff) {
expBackoff.InitialInterval = initInterval
}
}

// RandomizationFactor set rand factor.
func RandomizationFactor(factor float64) Option {
return func(expBackoff *backoff.ExponentialBackOff) {
expBackoff.RandomizationFactor = factor
}
}

// Multiplier sets mult.
func Multiplier(multiplier float64) Option {
return func(expBackoff *backoff.ExponentialBackOff) {
expBackoff.Multiplier = multiplier
}
}

// MaxInterval set max interval.
func MaxInterval(maxInterval time.Duration) Option {
return func(expBackoff *backoff.ExponentialBackOff) {
expBackoff.MaxInterval = maxInterval
}
}
250 changes: 250 additions & 0 deletions backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package backoff

import (
"context"
"errors"
"reflect"
"sync"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ozontech/file.d/stats"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestNew(t *testing.T) {
stats.InitStats()

tCases := []struct {
name string
opts []Option
counter prom.Counter
timeout time.Duration
retriesCfg RetriesCfg
expParams []interface{}
expInitInterval time.Duration
expRandFactor float64
expMulti float64
expMaxInterval time.Duration
}{
{
name: "initial_interval_opt",
opts: []Option{
InitialIntervalOpt(time.Nanosecond * 20),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: time.Nanosecond * 20,
expRandFactor: backoff.DefaultRandomizationFactor,
expMulti: backoff.DefaultMultiplier,
expMaxInterval: backoff.DefaultMaxInterval,
},
{
name: "initial_interval_opt",
opts: []Option{
InitialIntervalOpt(time.Nanosecond * 20),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: time.Nanosecond * 20,
expRandFactor: backoff.DefaultRandomizationFactor,
expMulti: backoff.DefaultMultiplier,
expMaxInterval: backoff.DefaultMaxInterval,
},
{
name: "randomization_factor_opt",
opts: []Option{
RandomizationFactor(5.5),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: backoff.DefaultInitialInterval,
expRandFactor: 5.5,
expMulti: backoff.DefaultMultiplier,
expMaxInterval: backoff.DefaultMaxInterval,
},
{
name: "multiplier_opt",
opts: []Option{
Multiplier(4.4),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: backoff.DefaultInitialInterval,
expRandFactor: backoff.DefaultRandomizationFactor,
expMulti: 4.4,
expMaxInterval: backoff.DefaultMaxInterval,
},
{
name: "max_interval_opt",
opts: []Option{
MaxInterval(time.Nanosecond * 44),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: backoff.DefaultInitialInterval,
expRandFactor: backoff.DefaultRandomizationFactor,
expMulti: backoff.DefaultMultiplier,
expMaxInterval: time.Nanosecond * 44,
},
{
name: "all_opt",
opts: []Option{
InitialIntervalOpt(time.Nanosecond * 20),
RandomizationFactor(2.2),
Multiplier(8.8),
MaxInterval(time.Microsecond * 3),
},
counter: stats.GetCounter("random", "random"),
timeout: time.Second,
retriesCfg: RetriesCfg{Limited: false, Limit: 0},
expInitInterval: time.Nanosecond * 20,
expRandFactor: 2.2,
expMulti: 8.8,
expMaxInterval: time.Microsecond * 3,
},
}

for _, tCase := range tCases {
t.Run(tCase.name, func(t *testing.T) {
expBackoff := New(context.Background(), tCase.counter, tCase.timeout, tCase.retriesCfg, tCase.opts...)
backoffCtx := expBackoff.backoff.(backoff.BackOffContext)

// get backoff.ExponentialBackoff under unexportable backoff.backOffContext
underlyingBackoff := reflect.Indirect(reflect.ValueOf(backoffCtx)).FieldByName("BackOff").Interface().(*backoff.ExponentialBackOff)
require.Equal(t, tCase.expInitInterval, underlyingBackoff.InitialInterval)
require.Equal(t, tCase.expRandFactor, underlyingBackoff.RandomizationFactor)
require.Equal(t, tCase.expMulti, underlyingBackoff.Multiplier)
require.Equal(t, tCase.expMaxInterval, underlyingBackoff.MaxInterval)
})
}
}

func TestNewWithRetries(t *testing.T) {
limit := uint64(10)
expBackoff := New(context.Background(), nil, time.Nanosecond, RetriesCfg{Limited: true, Limit: limit})
tries := reflect.Indirect(reflect.ValueOf(expBackoff.backoff)).FieldByName("maxTries").Uint()

require.Equal(t, limit, tries)
}

func TestExec(t *testing.T) {
stats.InitStats()
stats.RegisterCounter(&stats.MetricDesc{
Subsystem: "backoff_subsys",
Name: "backoff_cnt_test",
Help: "For tests",
})
counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test")

ctx := context.Background()

retry := 10
expBackoff := New(context.Background(), counter, time.Second, RetriesCfg{Limited: true, Limit: uint64(retry)})

var wg sync.WaitGroup
wg.Add(1)
executor := func(context.Context) error {
defer wg.Done()
return nil
}

var err error
func() {
err = expBackoff.RetryWithMetrics(ctx, executor)
}()

wg.Wait()
require.NoError(t, err)
}

func TestExecError(t *testing.T) {
stats.InitStats()
stats.RegisterCounter(&stats.MetricDesc{
Subsystem: "backoff_subsys",
Name: "backoff_cnt_test",
Help: "For tests",
})
counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test")

expErr := errors.New("some error")

ctx := context.Background()
retry := 10
expBackoff := New(
context.Background(),
counter,
time.Second,
RetriesCfg{Limited: true, Limit: uint64(retry) - 1},
InitialIntervalOpt(time.Nanosecond),
MaxInterval(time.Nanosecond),
)

var wg sync.WaitGroup
wg.Add(retry)
executor := func(context.Context) error {
defer wg.Done()
return expErr
}

var err error
func() {
err = expBackoff.RetryWithMetrics(ctx, executor)
}()

wg.Wait()
require.Error(t, err)
require.EqualError(t, expErr, err.Error())
}

func TestExecSuccessAfterRetry(t *testing.T) {
stats.InitStats()
stats.RegisterCounter(&stats.MetricDesc{
Subsystem: "backoff_subsys",
Name: "backoff_cnt_test",
Help: "For tests",
})
counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test")

expErr := errors.New("some error")

ctx := context.Background()

expBackoff := New(
context.Background(),
counter,
time.Second,
RetriesCfg{Limited: false},
InitialIntervalOpt(time.Nanosecond),
MaxInterval(time.Nanosecond),
)

successAfter := 10
i := 0
var wg sync.WaitGroup
wg.Add(successAfter + 1)
executor := func(context.Context) error {
defer wg.Done()

if i >= successAfter {
return nil
}
i++
return expErr
}

var err error
func() {
err = expBackoff.RetryWithMetrics(ctx, executor)
}()

wg.Wait()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/bitly/go-simplejson v0.5.0
github.com/cenkalti/backoff/v4 v4.1.2
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/ghodss/yaml v1.0.0
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
Loading