diff --git a/backoff/backoff.go b/backoff/backoff.go new file mode 100644 index 000000000..d264edc18 --- /dev/null +++ b/backoff/backoff.go @@ -0,0 +1,103 @@ +package backoff + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + prom "github.com/prometheus/client_golang/prometheus" +) + +const ( + ExpBackoffDefaultMultiplier = 1.2 + ExpBackoffDefaultRndFactor = 0.25 +) + +// BackOff is a wrapper that provides retry mechanism. +type BackOff struct { + counter prom.Counter + backoff backoff.BackOff + timeout time.Duration +} + +// RetriesCfg desribes retries count. +type RetriesCfg struct { + Limited bool + Limit uint64 +} + +// New instance of Backoff. +func New( + ctx context.Context, + counter prom.Counter, + timeout time.Duration, + retriesCfg RetriesCfg, + opts ...Option, +) *BackOff { + var expBackoff backoff.BackOff + expBackoff = backoff.WithContext(new(opts...), ctx) + if retriesCfg.Limited { + expBackoff = backoff.WithMaxRetries(expBackoff, retriesCfg.Limit) + } + + return &BackOff{ + counter: counter, + backoff: expBackoff, + timeout: timeout, + } +} + +// RetryWithMetrics processes given lambda and increments error metric on fail. +func (b *BackOff) RetryWithMetrics(ctx context.Context, executor func(ctx context.Context) error) error { + err := backoff.Retry(func() error { + ctx, cancel := context.WithTimeout(ctx, b.timeout) + defer cancel() + + if execErr := executor(ctx); execErr != nil { + b.counter.Inc() + return execErr + } + return nil + }, b.backoff) + return err +} + +// New returns exponential backoff. +func new(opts ...Option) *backoff.ExponentialBackOff { + backoff := backoff.NewExponentialBackOff() + for _, opt := range opts { + opt(backoff) + } + return backoff +} + +// Option confugures backoff. +type Option func(*backoff.ExponentialBackOff) + +// InitialIntervalOpt set interval. +func InitialIntervalOpt(initInterval time.Duration) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.InitialInterval = initInterval + } +} + +// RandomizationFactor set rand factor. +func RandomizationFactor(factor float64) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.RandomizationFactor = factor + } +} + +// Multiplier sets mult. +func Multiplier(multiplier float64) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.Multiplier = multiplier + } +} + +// MaxInterval set max interval. +func MaxInterval(maxInterval time.Duration) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.MaxInterval = maxInterval + } +} diff --git a/backoff/backoff_test.go b/backoff/backoff_test.go new file mode 100644 index 000000000..546366736 --- /dev/null +++ b/backoff/backoff_test.go @@ -0,0 +1,250 @@ +package backoff + +import ( + "context" + "errors" + "reflect" + "sync" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/ozontech/file.d/stats" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + stats.InitStats() + + tCases := []struct { + name string + opts []Option + counter prom.Counter + timeout time.Duration + retriesCfg RetriesCfg + expParams []interface{} + expInitInterval time.Duration + expRandFactor float64 + expMulti float64 + expMaxInterval time.Duration + }{ + { + name: "initial_interval_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "initial_interval_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "randomization_factor_opt", + opts: []Option{ + RandomizationFactor(5.5), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: 5.5, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "multiplier_opt", + opts: []Option{ + Multiplier(4.4), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: 4.4, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "max_interval_opt", + opts: []Option{ + MaxInterval(time.Nanosecond * 44), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: time.Nanosecond * 44, + }, + { + name: "all_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + RandomizationFactor(2.2), + Multiplier(8.8), + MaxInterval(time.Microsecond * 3), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: 2.2, + expMulti: 8.8, + expMaxInterval: time.Microsecond * 3, + }, + } + + for _, tCase := range tCases { + t.Run(tCase.name, func(t *testing.T) { + expBackoff := New(context.Background(), tCase.counter, tCase.timeout, tCase.retriesCfg, tCase.opts...) + backoffCtx := expBackoff.backoff.(backoff.BackOffContext) + + // get backoff.ExponentialBackoff under unexportable backoff.backOffContext + underlyingBackoff := reflect.Indirect(reflect.ValueOf(backoffCtx)).FieldByName("BackOff").Interface().(*backoff.ExponentialBackOff) + require.Equal(t, tCase.expInitInterval, underlyingBackoff.InitialInterval) + require.Equal(t, tCase.expRandFactor, underlyingBackoff.RandomizationFactor) + require.Equal(t, tCase.expMulti, underlyingBackoff.Multiplier) + require.Equal(t, tCase.expMaxInterval, underlyingBackoff.MaxInterval) + }) + } +} + +func TestNewWithRetries(t *testing.T) { + limit := uint64(10) + expBackoff := New(context.Background(), nil, time.Nanosecond, RetriesCfg{Limited: true, Limit: limit}) + tries := reflect.Indirect(reflect.ValueOf(expBackoff.backoff)).FieldByName("maxTries").Uint() + + require.Equal(t, limit, tries) +} + +func TestExec(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + ctx := context.Background() + + retry := 10 + expBackoff := New(context.Background(), counter, time.Second, RetriesCfg{Limited: true, Limit: uint64(retry)}) + + var wg sync.WaitGroup + wg.Add(1) + executor := func(context.Context) error { + defer wg.Done() + return nil + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.NoError(t, err) +} + +func TestExecError(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + expErr := errors.New("some error") + + ctx := context.Background() + retry := 10 + expBackoff := New( + context.Background(), + counter, + time.Second, + RetriesCfg{Limited: true, Limit: uint64(retry) - 1}, + InitialIntervalOpt(time.Nanosecond), + MaxInterval(time.Nanosecond), + ) + + var wg sync.WaitGroup + wg.Add(retry) + executor := func(context.Context) error { + defer wg.Done() + return expErr + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.Error(t, err) + require.EqualError(t, expErr, err.Error()) +} + +func TestExecSuccessAfterRetry(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + expErr := errors.New("some error") + + ctx := context.Background() + + expBackoff := New( + context.Background(), + counter, + time.Second, + RetriesCfg{Limited: false}, + InitialIntervalOpt(time.Nanosecond), + MaxInterval(time.Nanosecond), + ) + + successAfter := 10 + i := 0 + var wg sync.WaitGroup + wg.Add(successAfter + 1) + executor := func(context.Context) error { + defer wg.Done() + + if i >= successAfter { + return nil + } + i++ + return expErr + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.NoError(t, err) +} diff --git a/go.mod b/go.mod index f0e92d5ec..5fcdcd38e 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/bitly/go-simplejson v0.5.0 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/ghodss/yaml v1.0.0 github.com/golang/mock v1.6.0 @@ -36,7 +37,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect - github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect diff --git a/go.sum b/go.sum index 41147105e..651a92b76 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 996f2ffea..884fbfc8c 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -10,13 +10,13 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - insaneJSON "github.com/vitkovskii/insane-json" - "go.uber.org/zap" - + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" + insaneJSON "github.com/vitkovskii/insane-json" + "go.uber.org/zap" ) /*{ introduction @@ -43,6 +43,7 @@ const ( // metrics discardedEventCounter = "event_discarded" duplicatedEventCounter = "event_duplicated" + sendErrorCounter = "send_error" ) type pgType int @@ -69,13 +70,14 @@ type Plugin struct { ctx context.Context cancelFunc context.CancelFunc + backOff *backoff.BackOff queryBuilder PgQueryBuilder pool PgxIface } type ConfigColumn struct { Name string `json:"name" required:"true"` - ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"` + ColumnType string `json:"type" required:"true" options:"int|string|timestamp"` Unique bool `json:"unique" default:"false"` } @@ -123,14 +125,15 @@ type Config struct { //> @3@4@5@6 //> //> Timeout for DB requests in milliseconds. - DBRequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //* - DBRequestTimeout_ time.Duration + //> Timeouts can differ due using exponential backoff. + RequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //* + RequestTimeout_ time.Duration //> @3@4@5@6 //> //> Timeout for DB health check. - DBHealthCheckPeriod cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //* - DBHealthCheckPeriod_ time.Duration + HealthCheckInterval cfg.Duration `json:"db_health_check_interval" default:"60s" parse:"duration"` //* + HealthCheckInterval_ time.Duration //> @3@4@5@6 //> @@ -174,8 +177,14 @@ func (p *Plugin) registerPluginMetrics() { Subsystem: subsystemName, Help: "Total pgsql duplicated messages", }) + stats.RegisterCounter(&stats.MetricDesc{ + Name: sendErrorCounter, + Subsystem: subsystemName, + Help: "Total pgsql send errors", + }) } +// Start plugin. func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller p.logger = params.Logger @@ -191,10 +200,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP if p.config.Retention_ < 1 { p.logger.Fatal("'renetion' can't be <1") } - if p.config.DBRequestTimeout_ < 1 { + if p.config.RequestTimeout_ < 1 { p.logger.Fatal("'db_request_timeout' can't be <1") } - if p.config.DBHealthCheckPeriod_ < 1 { + if p.config.HealthCheckInterval_ < 1 { p.logger.Fatal("'db_health_check_period' can't be <1") } @@ -216,6 +225,20 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } p.pool = pool + p.backOff = backoff.New( + p.ctx, + stats.GetCounter(subsystemName, discardedEventCounter), + p.config.RequestTimeout_, + backoff.RetriesCfg{ + Limited: true, + Limit: uint64(p.config.Retry), + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(p.config.Retention_), + backoff.MaxInterval(p.config.Retention_*2), + ) + p.batcher = pipeline.NewBatcher( params.PipelineName, outPluginType, @@ -235,6 +258,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.batcher.Start(ctx) } +// Stop plug. func (p *Plugin) Stop() { p.cancelFunc() p.batcher.Stop() @@ -306,33 +330,22 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { argsSliceInterface[i] = args[i-1] } - var ctx context.Context - var cancel context.CancelFunc - var outErr error // Insert into pg with retry. - for i := p.config.Retry; i > 0; i-- { - ctx, cancel = context.WithTimeout(p.ctx, p.config.DBRequestTimeout_) - - p.logger.Info(query, args) + if err = p.backOff.RetryWithMetrics(p.ctx, func(ctx context.Context) error { rows, err := p.pool.Query(ctx, query, argsSliceInterface...) defer func() { - rows.Close() + if rows != nil { + rows.Close() + } }() if err != nil { - outErr = err - p.logger.Infof("rows: %v, err: %s", rows, err.Error()) - cancel() - time.Sleep(p.config.Retention_) - continue - } else { - outErr = nil - break + p.logger.Errorf("can't insert query: %s, args: %v: %s", query, args, err.Error()) + return err } - } - - if outErr != nil { + return nil + }); err != nil { p.pool.Close() - p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, outErr) + p.logger.Fatalf("can't insert. query: %s, args: %v: %s", query, args, err.Error()) } } @@ -399,7 +412,7 @@ func (p *Plugin) parsePGConfig() (*pgxpool.Config, error) { } pgCfg.LazyConnect = false - pgCfg.HealthCheckPeriod = p.config.DBHealthCheckPeriod_ + pgCfg.HealthCheckPeriod = p.config.HealthCheckInterval_ return pgCfg, nil } diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 68ab8399e..44e838b17 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -10,6 +10,7 @@ import ( "github.com/golang/mock/gomock" "github.com/jackc/pgconn" "github.com/jackc/pgproto3/v2" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" mock_pg "github.com/ozontech/file.d/plugin/output/postgres/mock" @@ -61,9 +62,10 @@ func TestPrivateOut(t *testing.T) { table := "table1" + retryCnt := 0 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -83,12 +85,27 @@ func TestPrivateOut(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + backOff := backoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + backoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) + p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backOff: backOff, } p.registerPluginMetrics() @@ -133,9 +150,10 @@ func TestPrivateOutWithRetry(t *testing.T) { table := "table1" + retryCnt := 3 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -160,12 +178,27 @@ func TestPrivateOutWithRetry(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + backOff := backoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + backoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) + p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backOff: backOff, } p.registerPluginMetrics() @@ -209,18 +242,33 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { table := "table1" + retryCnt := 0 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + backOff := backoff.New( + context.Background(), + stats.GetCounter("random_test", "random_test"), + time.Second*5, + backoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) p := &Plugin{ config: &config, queryBuilder: builder, logger: testLogger, + backOff: backOff, } p.registerPluginMetrics() @@ -275,9 +323,10 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { table := "table1" + retryCnt := 1 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -297,12 +346,26 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + backOff := backoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + backoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backOff: backOff, } p.registerPluginMetrics() @@ -449,9 +512,10 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te table := "table1" + retryCnt := 999 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -472,12 +536,26 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + backOff := backoff.New( + context.Background(), + stats.GetCounter("random_test", "random_test"), + time.Second*5, + backoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backOff: backOff, } p.registerPluginMetrics() diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index d8478d5aa..d4f5ab736 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" @@ -30,12 +31,14 @@ const ( ) type Plugin struct { + ctx context.Context config *Config client http.Client logger *zap.SugaredLogger avgEventSize int batcher *pipeline.Batcher controller pipeline.OutputPluginController + backoff *backoff.BackOff } //! config-params @@ -99,7 +102,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.client = p.newClient(p.config.RequestTimeout_) p.registerPluginMetrics() - + p.ctx = context.TODO() + p.backoff = backoff.New( + p.ctx, + stats.GetCounter(subsystemName, sendErrorCounter), + // No concrete time limit here. + time.Hour, + backoff.RetriesCfg{Limited: false}, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ) p.batcher = pipeline.NewBatcher( params.PipelineName, outPluginType, @@ -112,7 +126,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP 0, ) - p.batcher.Start(context.TODO()) + p.batcher.Start(p.ctx) } func (p *Plugin) registerPluginMetrics() { @@ -157,18 +171,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { p.logger.Debugf("Trying to send: %s", outBuf) - for { - err := p.send(outBuf) - if err != nil { - stats.GetCounter(subsystemName, sendErrorCounter).Inc() - p.logger.Errorf("Can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) - time.Sleep(time.Second) - - continue + _ = p.backoff.RetryWithMetrics(p.ctx, func(ctx context.Context) error { + sendErr := p.send(outBuf) + if sendErr != nil { + p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error()) + return sendErr } + return nil + }) - break - } p.logger.Debugf("Successfully sent: %s", outBuf) } diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go index 385a222e3..10cec0f63 100644 --- a/plugin/output/splunk/splunk_test.go +++ b/plugin/output/splunk/splunk_test.go @@ -1,18 +1,23 @@ package splunk import ( + "context" "io/ioutil" "net/http" "net/http/httptest" "testing" + "time" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/stats" "github.com/stretchr/testify/assert" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" ) func TestSplunk(t *testing.T) { + stats.InitStats() suites := []struct { name string input string @@ -44,11 +49,23 @@ func TestSplunk(t *testing.T) { })) defer testServer.Close() + ctx := context.TODO() plugin := Plugin{ + ctx: ctx, config: &Config{ Endpoint: testServer.URL, }, logger: zap.NewExample().Sugar(), + backoff: backoff.New( + ctx, + stats.GetCounter(subsystemName, sendErrorCounter), + time.Minute, + backoff.RetriesCfg{Limited: false}, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), + ), } batch := pipeline.Batch{