From b863c884c770d4c37713997b83d6c30f989ed715 Mon Sep 17 00:00:00 2001 From: ansakharov Date: Thu, 14 Apr 2022 22:59:40 +0300 Subject: [PATCH 1/5] Add exponential backoff for pg output plugin --- Makefile | 2 +- go.mod | 1 + go.sum | 2 + plugin/output/postgres/postgres.go | 39 ++++++++++-------- plugin/output/postgres/postgres_test.go | 53 ++++++++++++++++++++++--- plugin/output/splunk/splunk.go | 28 ++++++++----- plugin/output/splunk/splunk_test.go | 13 +++++- 7 files changed, 104 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index 6fc42eaa9..cd1c5b95a 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION ?= v0.5.3 +VERSION ?= v0.5.4 UPSTREAM_BRANCH ?= origin/master .PHONY: prepare diff --git a/go.mod b/go.mod index 20773d69e..0054ab358 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/alecthomas/kingpin v2.2.6+incompatible github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d github.com/bitly/go-simplejson v0.5.0 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/euank/go-kmsg-parser v2.0.0+incompatible github.com/ghodss/yaml v1.0.0 github.com/golang/mock v1.6.0 diff --git a/go.sum b/go.sum index e2e6d79cc..9910866da 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 9d82e8c33..819e0d065 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -8,6 +8,7 @@ import ( "time" sq "github.com/Masterminds/squirrel" + "github.com/cenkalti/backoff/v4" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4/pgxpool" "github.com/ozontech/file.d/cfg" @@ -65,6 +66,7 @@ type Plugin struct { ctx context.Context cancelFunc context.CancelFunc + backoff backoff.BackOff queryBuilder PgQueryBuilder pool PgxIface } @@ -238,6 +240,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } p.pool = pool + stdBackoff := backoff.NewExponentialBackOff() + stdBackoff.Multiplier = 1.2 + stdBackoff.RandomizationFactor = 0.25 + stdBackoff.InitialInterval = p.config.Retention_ + stdBackoff.MaxInterval = p.config.Retention_ * 2 + + ctxBackoff := backoff.WithContext(stdBackoff, p.ctx) + expBackoff := backoff.WithMaxRetries(ctxBackoff, uint64(p.config.Retry)) + + p.backoff = expBackoff + p.batcher = pipeline.NewBatcher( params.PipelineName, outPluginType, @@ -321,31 +334,23 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { p.logger.Fatalf("Invalid SQL. query: %s, args: %v, err: %v", query, args, err) } - var ctx context.Context - var cancel context.CancelFunc - var outErr error // Insert into pg with retry. - for i := p.config.Retry; i > 0; i-- { - ctx, cancel = context.WithTimeout(p.ctx, p.config.DBRequestTimeout_) + err = backoff.Retry(func() error { + ctx, cancel := context.WithTimeout(p.ctx, p.config.DBRequestTimeout_) + defer cancel() p.logger.Info(query, args) pgCommResult, err := p.pool.Exec(ctx, query, args...) if err != nil { - outErr = err p.logger.Infof("pgCommResult: %v, err: %s", pgCommResult, err.Error()) - cancel() - time.Sleep(p.config.Retention_) - continue - } else { - outErr = nil - break - } - } - cancel() - if outErr != nil { + return err + } + return nil + }, p.backoff) + if err != nil { p.pool.Close() - p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, outErr) + p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err) } } diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 465f17747..37550d295 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/cenkalti/backoff/v4" "github.com/golang/mock/gomock" "github.com/jackc/pgconn" "github.com/ozontech/file.d/logger" @@ -60,9 +61,10 @@ func TestPrivateOut(t *testing.T) { table := "table1" + retryCnt := 0 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -82,12 +84,20 @@ func TestPrivateOut(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + retryBackoff := backoff.WithMaxRetries( + backoff.WithContext( + backoff.NewExponentialBackOff(), + ctx), + uint64(retryCnt), + ) + p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backoff: retryBackoff, } p.registerPluginMetrics() @@ -132,9 +142,10 @@ func TestPrivateOutWithRetry(t *testing.T) { table := "table1" + retryCnt := 3 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -159,12 +170,20 @@ func TestPrivateOutWithRetry(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + retryBackoff := backoff.WithMaxRetries( + backoff.WithContext( + backoff.NewExponentialBackOff(), + ctx), + 3, + ) + p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backoff: retryBackoff, } p.registerPluginMetrics() @@ -208,18 +227,26 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { table := "table1" + retryCnt := 0 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + retryBackoff := backoff.WithMaxRetries( + backoff.WithContext( + backoff.NewExponentialBackOff(), + context.Background()), + uint64(retryCnt), + ) p := &Plugin{ config: &config, queryBuilder: builder, logger: testLogger, + backoff: retryBackoff, } p.registerPluginMetrics() @@ -274,9 +301,10 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { table := "table1" + retryCnt := 1 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -296,12 +324,19 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + retryBackoff := backoff.WithMaxRetries( + backoff.WithContext( + backoff.NewExponentialBackOff(), + ctx), + 3, + ) p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backoff: retryBackoff, } p.registerPluginMetrics() @@ -448,9 +483,10 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te table := "table1" + retryCnt := 999 config := Config{ Columns: columns, - Retry: 3, + Retry: retryCnt, } ctl := gomock.NewController(t) @@ -471,12 +507,19 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) + retryBackoff := backoff.WithMaxRetries( + backoff.WithContext( + backoff.NewExponentialBackOff(), + ctx), + 3, + ) p := &Plugin{ config: &config, queryBuilder: builder, pool: pool, logger: testLogger, ctx: ctx, + backoff: retryBackoff, } p.registerPluginMetrics() diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index b4e7fa2c7..e5fd35131 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/cenkalti/backoff/v4" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" @@ -36,6 +37,7 @@ type Plugin struct { avgEventSize int batcher *pipeline.Batcher controller pipeline.OutputPluginController + backoff backoff.BackOff } //! config-params @@ -98,6 +100,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.config = config.(*Config) p.client = p.newClient(p.config.RequestTimeout_) + ctx := context.TODO() + stdBackoff := backoff.NewExponentialBackOff() + stdBackoff.Multiplier = 1.2 + stdBackoff.RandomizationFactor = 0.25 + stdBackoff.InitialInterval = time.Second + stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2 + p.backoff = backoff.WithContext(stdBackoff, ctx) + p.registerPluginMetrics() p.batcher = pipeline.NewBatcher( @@ -112,7 +122,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP 0, ) - p.batcher.Start(context.TODO()) + p.batcher.Start(ctx) } func (p *Plugin) registerPluginMetrics() { @@ -154,18 +164,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { insaneJSON.Release(root) data.outBuf = outBuf - for { - err := p.send(outBuf) - if err != nil { + _ = backoff.Retry(func() error { + sendErr := p.send(outBuf) + if sendErr != nil { stats.GetCounter(subsystemName, sendErrorCounter).Inc() - p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error()) - time.Sleep(time.Second) + p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error()) - continue + return sendErr } - - break - } + return nil + }, p.backoff) } func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {} diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go index 385a222e3..13eeec267 100644 --- a/plugin/output/splunk/splunk_test.go +++ b/plugin/output/splunk/splunk_test.go @@ -1,11 +1,14 @@ package splunk import ( + "context" "io/ioutil" "net/http" "net/http/httptest" "testing" + "time" + "github.com/cenkalti/backoff/v4" "github.com/ozontech/file.d/pipeline" "github.com/stretchr/testify/assert" insaneJSON "github.com/vitkovskii/insane-json" @@ -44,11 +47,19 @@ func TestSplunk(t *testing.T) { })) defer testServer.Close() + ctx := context.TODO() + stdBackoff := backoff.NewExponentialBackOff() + stdBackoff.Multiplier = 1.2 + stdBackoff.RandomizationFactor = 0.25 + stdBackoff.InitialInterval = time.Second + stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2 + plugin := Plugin{ config: &Config{ Endpoint: testServer.URL, }, - logger: zap.NewExample().Sugar(), + logger: zap.NewExample().Sugar(), + backoff: backoff.WithContext(stdBackoff, ctx), } batch := pipeline.Batch{ From 1406e6f843c2c798b01147ecd6120a5d8eebc148 Mon Sep 17 00:00:00 2001 From: ansakharov Date: Fri, 29 Apr 2022 13:53:58 +0300 Subject: [PATCH 2/5] Fixes from review --- consts/consts.go | 6 ++++++ plugin/output/postgres/postgres.go | 5 +++-- plugin/output/splunk/splunk.go | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 consts/consts.go diff --git a/consts/consts.go b/consts/consts.go new file mode 100644 index 000000000..aae1f4998 --- /dev/null +++ b/consts/consts.go @@ -0,0 +1,6 @@ +package consts + +const ( + ExpBackoffDefaultMultiplier = 1.2 + ExpBackoffDefaultRndFactor = 0.25 +) diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 819e0d065..7ffc4e409 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgconn" "github.com/jackc/pgx/v4/pgxpool" "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/consts" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" @@ -241,8 +242,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.pool = pool stdBackoff := backoff.NewExponentialBackOff() - stdBackoff.Multiplier = 1.2 - stdBackoff.RandomizationFactor = 0.25 + stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier + stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor stdBackoff.InitialInterval = p.config.Retention_ stdBackoff.MaxInterval = p.config.Retention_ * 2 diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index e5fd35131..9fe308d34 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/consts" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" @@ -102,8 +103,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP ctx := context.TODO() stdBackoff := backoff.NewExponentialBackOff() - stdBackoff.Multiplier = 1.2 - stdBackoff.RandomizationFactor = 0.25 + stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier + stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor stdBackoff.InitialInterval = time.Second stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2 p.backoff = backoff.WithContext(stdBackoff, ctx) From 0eafc00ef599cdf881071c9c94f16e401cdafdd4 Mon Sep 17 00:00:00 2001 From: ansakharov Date: Thu, 2 Jun 2022 03:33:17 +0300 Subject: [PATCH 3/5] fix PR from review --- plugin/output/postgres/postgres.go | 66 +++++++++-------- plugin/output/postgres/postgres_test.go | 98 +++++++++++++++++-------- plugin/output/splunk/splunk.go | 36 +++++---- plugin/output/splunk/splunk_test.go | 25 ++++--- 4 files changed, 139 insertions(+), 86 deletions(-) diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 374b7784b..9cab77475 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -8,17 +8,16 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/cenkalti/backoff/v4" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - insaneJSON "github.com/vitkovskii/insane-json" - "go.uber.org/zap" - "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/consts" + "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" + insaneJSON "github.com/vitkovskii/insane-json" + "go.uber.org/zap" ) /*{ introduction @@ -45,6 +44,7 @@ const ( // metrics discardedEventCounter = "event_discarded" duplicatedEventCounter = "event_duplicated" + sendErrorCounter = "send_error" ) type pgType int @@ -71,14 +71,14 @@ type Plugin struct { ctx context.Context cancelFunc context.CancelFunc - backoff backoff.BackOff + backOff *expbackoff.BackOff queryBuilder PgQueryBuilder pool PgxIface } type ConfigColumn struct { Name string `json:"name" required:"true"` - ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"` + ColumnType string `json:"type" required:"true" options:"int|string|timestamp"` Unique bool `json:"unique" default:"false"` } @@ -127,14 +127,14 @@ type Config struct { //> //> Timeout for DB requests in milliseconds. //> Timeouts can differ due using exponential backoff. - DBRequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //* - DBRequestTimeout_ time.Duration + RequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //* + RequestTimeout_ time.Duration //> @3@4@5@6 //> //> Timeout for DB health check. - DBHealthCheckPeriod cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //* - DBHealthCheckPeriod_ time.Duration + HealthCheckPeriod cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //* + HealthCheckPeriod_ time.Duration //> @3@4@5@6 //> @@ -178,8 +178,14 @@ func (p *Plugin) registerPluginMetrics() { Subsystem: subsystemName, Help: "Total pgsql duplicated messages", }) + stats.RegisterCounter(&stats.MetricDesc{ + Name: sendErrorCounter, + Subsystem: subsystemName, + Help: "Total pgsql send errors", + }) } +// Start plugin. func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller p.logger = params.Logger @@ -195,10 +201,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP if p.config.Retention_ < 1 { p.logger.Fatal("'renetion' can't be <1") } - if p.config.DBRequestTimeout_ < 1 { + if p.config.RequestTimeout_ < 1 { p.logger.Fatal("'db_request_timeout' can't be <1") } - if p.config.DBHealthCheckPeriod_ < 1 { + if p.config.HealthCheckPeriod_ < 1 { p.logger.Fatal("'db_health_check_period' can't be <1") } @@ -220,16 +226,19 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } p.pool = pool - stdBackoff := backoff.NewExponentialBackOff() - stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier - stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor - stdBackoff.InitialInterval = p.config.Retention_ - stdBackoff.MaxInterval = p.config.Retention_ * 2 - - ctxBackoff := backoff.WithContext(stdBackoff, p.ctx) - expBackoff := backoff.WithMaxRetries(ctxBackoff, uint64(p.config.Retry)) - - p.backoff = expBackoff + p.backOff = expbackoff.New( + p.ctx, + stats.GetCounter(subsystemName, discardedEventCounter), + p.config.RequestTimeout_, + expbackoff.RetriesCfg{ + Limited: true, + Limit: uint64(p.config.Retry), + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(p.config.Retention_), + expbackoff.MaxInterval(p.config.Retention_*2), + ) p.batcher = pipeline.NewBatcher( params.PipelineName, @@ -250,6 +259,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.batcher.Start(ctx) } +// Stop plug. func (p *Plugin) Stop() { p.cancelFunc() p.batcher.Stop() @@ -322,22 +332,18 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { } // Insert into pg with retry. - err = backoff.Retry(func() error { - ctx, cancel := context.WithTimeout(p.ctx, p.config.DBRequestTimeout_) - defer cancel() - + if err = p.backOff.Exec(p.ctx, func(ctx context.Context) error { p.logger.Info(query, args) rows, err := p.pool.Query(ctx, query, argsSliceInterface...) defer func() { rows.Close() }() if err != nil { - p.logger.Infof("rows: %v, err: %s", rows, err.Error()) + p.logger.Errorf("rows: %v, err: %s", rows, err.Error()) return err } return nil - }, p.backoff) - if err != nil { + }); err != nil { p.pool.Close() p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err) } @@ -406,7 +412,7 @@ func (p *Plugin) parsePGConfig() (*pgxpool.Config, error) { } pgCfg.LazyConnect = false - pgCfg.HealthCheckPeriod = p.config.DBHealthCheckPeriod_ + pgCfg.HealthCheckPeriod = p.config.HealthCheckPeriod_ return pgCfg, nil } diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index b78bfae5e..20fa6eaf4 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -7,10 +7,11 @@ import ( "testing" "time" - "github.com/cenkalti/backoff/v4" "github.com/golang/mock/gomock" "github.com/jackc/pgconn" "github.com/jackc/pgproto3/v2" + "github.com/ozontech/file.d/consts" + "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" mock_pg "github.com/ozontech/file.d/plugin/output/postgres/mock" @@ -85,11 +86,18 @@ func TestPrivateOut(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - retryBackoff := backoff.WithMaxRetries( - backoff.WithContext( - backoff.NewExponentialBackOff(), - ctx), - uint64(retryCnt), + backOff := expbackoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + expbackoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), ) p := &Plugin{ @@ -98,7 +106,7 @@ func TestPrivateOut(t *testing.T) { pool: pool, logger: testLogger, ctx: ctx, - backoff: retryBackoff, + backOff: backOff, } p.registerPluginMetrics() @@ -171,11 +179,18 @@ func TestPrivateOutWithRetry(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - retryBackoff := backoff.WithMaxRetries( - backoff.WithContext( - backoff.NewExponentialBackOff(), - ctx), - 3, + backOff := expbackoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + expbackoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), ) p := &Plugin{ @@ -184,7 +199,7 @@ func TestPrivateOutWithRetry(t *testing.T) { pool: pool, logger: testLogger, ctx: ctx, - backoff: retryBackoff, + backOff: backOff, } p.registerPluginMetrics() @@ -237,17 +252,24 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - retryBackoff := backoff.WithMaxRetries( - backoff.WithContext( - backoff.NewExponentialBackOff(), - context.Background()), - uint64(retryCnt), + backOff := expbackoff.New( + context.Background(), + stats.GetCounter("random_test", "random_test"), + time.Second*5, + expbackoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, queryBuilder: builder, logger: testLogger, - backoff: retryBackoff, + backOff: backOff, } p.registerPluginMetrics() @@ -325,11 +347,18 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - retryBackoff := backoff.WithMaxRetries( - backoff.WithContext( - backoff.NewExponentialBackOff(), - ctx), - 3, + backOff := expbackoff.New( + ctx, + stats.GetCounter("random_test", "random_test"), + time.Second*5, + expbackoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, @@ -337,7 +366,7 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { pool: pool, logger: testLogger, ctx: ctx, - backoff: retryBackoff, + backOff: backOff, } p.registerPluginMetrics() @@ -508,11 +537,18 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - retryBackoff := backoff.WithMaxRetries( - backoff.WithContext( - backoff.NewExponentialBackOff(), - ctx), - 3, + backOff := expbackoff.New( + context.Background(), + stats.GetCounter("random_test", "random_test"), + time.Second*5, + expbackoff.RetriesCfg{ + Limited: true, + Limit: 10, + }, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, @@ -520,7 +556,7 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te pool: pool, logger: testLogger, ctx: ctx, - backoff: retryBackoff, + backOff: backOff, } p.registerPluginMetrics() diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index fa5901bbc..89bf363fa 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -10,9 +10,9 @@ import ( "net/http" "time" - "github.com/cenkalti/backoff/v4" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/consts" + "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" @@ -32,13 +32,14 @@ const ( ) type Plugin struct { + ctx context.Context config *Config client http.Client logger *zap.SugaredLogger avgEventSize int batcher *pipeline.Batcher controller pipeline.OutputPluginController - backoff backoff.BackOff + backoff *expbackoff.BackOff } //! config-params @@ -101,16 +102,19 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.config = config.(*Config) p.client = p.newClient(p.config.RequestTimeout_) - ctx := context.TODO() - stdBackoff := backoff.NewExponentialBackOff() - stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier - stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor - stdBackoff.InitialInterval = time.Second - stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2 - p.backoff = backoff.WithContext(stdBackoff, ctx) - p.registerPluginMetrics() - + p.ctx = context.TODO() + p.backoff = expbackoff.New( + p.ctx, + stats.GetCounter(subsystemName, sendErrorCounter), + // No concrete time limit here. + time.Hour, + expbackoff.RetriesCfg{Limited: false}, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), + ) p.batcher = pipeline.NewBatcher( params.PipelineName, outPluginType, @@ -123,7 +127,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP 0, ) - p.batcher.Start(ctx) + p.batcher.Start(p.ctx) } func (p *Plugin) registerPluginMetrics() { @@ -167,16 +171,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { data.outBuf = outBuf p.logger.Debugf("Trying to send: %s", outBuf) - _ = backoff.Retry(func() error { + + _ = p.backoff.Exec(p.ctx, func(ctx context.Context) error { sendErr := p.send(outBuf) if sendErr != nil { - stats.GetCounter(subsystemName, sendErrorCounter).Inc() p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error()) - return sendErr } return nil - }, p.backoff) + }) + p.logger.Debugf("Successfully sent: %s", outBuf) } diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go index 13eeec267..1d0f24e3d 100644 --- a/plugin/output/splunk/splunk_test.go +++ b/plugin/output/splunk/splunk_test.go @@ -8,14 +8,17 @@ import ( "testing" "time" - "github.com/cenkalti/backoff/v4" + "github.com/ozontech/file.d/consts" + "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/stats" "github.com/stretchr/testify/assert" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/zap" ) func TestSplunk(t *testing.T) { + stats.InitStats() suites := []struct { name string input string @@ -48,18 +51,22 @@ func TestSplunk(t *testing.T) { defer testServer.Close() ctx := context.TODO() - stdBackoff := backoff.NewExponentialBackOff() - stdBackoff.Multiplier = 1.2 - stdBackoff.RandomizationFactor = 0.25 - stdBackoff.InitialInterval = time.Second - stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2 - plugin := Plugin{ + ctx: ctx, config: &Config{ Endpoint: testServer.URL, }, - logger: zap.NewExample().Sugar(), - backoff: backoff.WithContext(stdBackoff, ctx), + logger: zap.NewExample().Sugar(), + backoff: expbackoff.New( + ctx, + stats.GetCounter(subsystemName, sendErrorCounter), + time.Minute, + expbackoff.RetriesCfg{Limited: false}, + expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), + expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), + expbackoff.InitialIntervalOpt(time.Second), + expbackoff.MaxInterval(time.Second*2), + ), } batch := pipeline.Batch{ From ec850689ed7e0b59f85845bea7b569d7432dc7b0 Mon Sep 17 00:00:00 2001 From: ansakharov Date: Fri, 3 Jun 2022 11:31:21 +0300 Subject: [PATCH 4/5] PR fixes --- backoff/backoff.go | 106 ++++++++++ backoff/backoff_test.go | 250 ++++++++++++++++++++++++ consts/consts.go | 6 - plugin/output/postgres/postgres.go | 28 +-- plugin/output/postgres/postgres_test.go | 63 +++--- plugin/output/splunk/splunk.go | 19 +- plugin/output/splunk/splunk_test.go | 15 +- 7 files changed, 417 insertions(+), 70 deletions(-) create mode 100644 backoff/backoff.go create mode 100644 backoff/backoff_test.go delete mode 100644 consts/consts.go diff --git a/backoff/backoff.go b/backoff/backoff.go new file mode 100644 index 000000000..42c39b98e --- /dev/null +++ b/backoff/backoff.go @@ -0,0 +1,106 @@ +package backoff + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + prom "github.com/prometheus/client_golang/prometheus" +) + +const ( + ExpBackoffDefaultMultiplier = 1.2 + ExpBackoffDefaultRndFactor = 0.25 +) + +// BackOff is a wrapper that provides retry mechanism. +type BackOff struct { + counter prom.Counter + backoff backoff.BackOff + timeout time.Duration +} + +// RetriesCfg desribes retries count. +type RetriesCfg struct { + Limited bool + Limit uint64 +} + +// New instance of Backoff. +func New( + ctx context.Context, + counter prom.Counter, + timeout time.Duration, + retriesCfg RetriesCfg, + opts ...Option, +) *BackOff { + var expBackoff backoff.BackOff + expBackoff = backoff.WithContext(new(opts...), ctx) + if retriesCfg.Limited { + expBackoff = backoff.WithMaxRetries(expBackoff, retriesCfg.Limit) + } + + return &BackOff{ + counter: counter, + backoff: expBackoff, + timeout: timeout, + } +} + +// RetryWithMetrics processes given lambda and increments error metric on fail. +func (b *BackOff) RetryWithMetrics(ctx context.Context, executor func(ctx context.Context) error) error { + err := backoff.Retry(func() error { + ctx, cancel := context.WithTimeout(ctx, b.timeout) + defer cancel() + + if execErr := executor(ctx); execErr != nil { + b.counter.Inc() + return execErr + } + return nil + }, b.backoff) + return err +} + +// New returns exponential backoff. +func new(opts ...Option) *backoff.ExponentialBackOff { + backoff := backoff.NewExponentialBackOff() + for _, opt := range opts { + fmt.Println("op", opt) + opt(backoff) + } + fmt.Println(backoff) + return backoff +} + +// Option confugures backoff. +type Option func(*backoff.ExponentialBackOff) + +// InitialIntervalOpt set interval. +func InitialIntervalOpt(initInterval time.Duration) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.InitialInterval = initInterval + } +} + +// RandomizationFactor set rand factor. +func RandomizationFactor(factor float64) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.RandomizationFactor = factor + } +} + +// Multiplier sets mult. +func Multiplier(multiplier float64) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.Multiplier = multiplier + } +} + +// MaxInterval set max interval. +func MaxInterval(maxInterval time.Duration) Option { + return func(expBackoff *backoff.ExponentialBackOff) { + expBackoff.MaxInterval = maxInterval + } +} diff --git a/backoff/backoff_test.go b/backoff/backoff_test.go new file mode 100644 index 000000000..546366736 --- /dev/null +++ b/backoff/backoff_test.go @@ -0,0 +1,250 @@ +package backoff + +import ( + "context" + "errors" + "reflect" + "sync" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/ozontech/file.d/stats" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + stats.InitStats() + + tCases := []struct { + name string + opts []Option + counter prom.Counter + timeout time.Duration + retriesCfg RetriesCfg + expParams []interface{} + expInitInterval time.Duration + expRandFactor float64 + expMulti float64 + expMaxInterval time.Duration + }{ + { + name: "initial_interval_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "initial_interval_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "randomization_factor_opt", + opts: []Option{ + RandomizationFactor(5.5), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: 5.5, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "multiplier_opt", + opts: []Option{ + Multiplier(4.4), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: 4.4, + expMaxInterval: backoff.DefaultMaxInterval, + }, + { + name: "max_interval_opt", + opts: []Option{ + MaxInterval(time.Nanosecond * 44), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: backoff.DefaultInitialInterval, + expRandFactor: backoff.DefaultRandomizationFactor, + expMulti: backoff.DefaultMultiplier, + expMaxInterval: time.Nanosecond * 44, + }, + { + name: "all_opt", + opts: []Option{ + InitialIntervalOpt(time.Nanosecond * 20), + RandomizationFactor(2.2), + Multiplier(8.8), + MaxInterval(time.Microsecond * 3), + }, + counter: stats.GetCounter("random", "random"), + timeout: time.Second, + retriesCfg: RetriesCfg{Limited: false, Limit: 0}, + expInitInterval: time.Nanosecond * 20, + expRandFactor: 2.2, + expMulti: 8.8, + expMaxInterval: time.Microsecond * 3, + }, + } + + for _, tCase := range tCases { + t.Run(tCase.name, func(t *testing.T) { + expBackoff := New(context.Background(), tCase.counter, tCase.timeout, tCase.retriesCfg, tCase.opts...) + backoffCtx := expBackoff.backoff.(backoff.BackOffContext) + + // get backoff.ExponentialBackoff under unexportable backoff.backOffContext + underlyingBackoff := reflect.Indirect(reflect.ValueOf(backoffCtx)).FieldByName("BackOff").Interface().(*backoff.ExponentialBackOff) + require.Equal(t, tCase.expInitInterval, underlyingBackoff.InitialInterval) + require.Equal(t, tCase.expRandFactor, underlyingBackoff.RandomizationFactor) + require.Equal(t, tCase.expMulti, underlyingBackoff.Multiplier) + require.Equal(t, tCase.expMaxInterval, underlyingBackoff.MaxInterval) + }) + } +} + +func TestNewWithRetries(t *testing.T) { + limit := uint64(10) + expBackoff := New(context.Background(), nil, time.Nanosecond, RetriesCfg{Limited: true, Limit: limit}) + tries := reflect.Indirect(reflect.ValueOf(expBackoff.backoff)).FieldByName("maxTries").Uint() + + require.Equal(t, limit, tries) +} + +func TestExec(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + ctx := context.Background() + + retry := 10 + expBackoff := New(context.Background(), counter, time.Second, RetriesCfg{Limited: true, Limit: uint64(retry)}) + + var wg sync.WaitGroup + wg.Add(1) + executor := func(context.Context) error { + defer wg.Done() + return nil + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.NoError(t, err) +} + +func TestExecError(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + expErr := errors.New("some error") + + ctx := context.Background() + retry := 10 + expBackoff := New( + context.Background(), + counter, + time.Second, + RetriesCfg{Limited: true, Limit: uint64(retry) - 1}, + InitialIntervalOpt(time.Nanosecond), + MaxInterval(time.Nanosecond), + ) + + var wg sync.WaitGroup + wg.Add(retry) + executor := func(context.Context) error { + defer wg.Done() + return expErr + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.Error(t, err) + require.EqualError(t, expErr, err.Error()) +} + +func TestExecSuccessAfterRetry(t *testing.T) { + stats.InitStats() + stats.RegisterCounter(&stats.MetricDesc{ + Subsystem: "backoff_subsys", + Name: "backoff_cnt_test", + Help: "For tests", + }) + counter := stats.GetCounter("backoff_subsys", "backoff_cnt_test") + + expErr := errors.New("some error") + + ctx := context.Background() + + expBackoff := New( + context.Background(), + counter, + time.Second, + RetriesCfg{Limited: false}, + InitialIntervalOpt(time.Nanosecond), + MaxInterval(time.Nanosecond), + ) + + successAfter := 10 + i := 0 + var wg sync.WaitGroup + wg.Add(successAfter + 1) + executor := func(context.Context) error { + defer wg.Done() + + if i >= successAfter { + return nil + } + i++ + return expErr + } + + var err error + func() { + err = expBackoff.RetryWithMetrics(ctx, executor) + }() + + wg.Wait() + require.NoError(t, err) +} diff --git a/consts/consts.go b/consts/consts.go deleted file mode 100644 index aae1f4998..000000000 --- a/consts/consts.go +++ /dev/null @@ -1,6 +0,0 @@ -package consts - -const ( - ExpBackoffDefaultMultiplier = 1.2 - ExpBackoffDefaultRndFactor = 0.25 -) diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 9cab77475..868a2d679 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -10,9 +10,8 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/consts" - "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" @@ -71,7 +70,7 @@ type Plugin struct { ctx context.Context cancelFunc context.CancelFunc - backOff *expbackoff.BackOff + backOff *backoff.BackOff queryBuilder PgQueryBuilder pool PgxIface } @@ -226,18 +225,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP } p.pool = pool - p.backOff = expbackoff.New( + p.backOff = backoff.New( p.ctx, stats.GetCounter(subsystemName, discardedEventCounter), p.config.RequestTimeout_, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: uint64(p.config.Retry), }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(p.config.Retention_), - expbackoff.MaxInterval(p.config.Retention_*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(p.config.Retention_), + backoff.MaxInterval(p.config.Retention_*2), ) p.batcher = pipeline.NewBatcher( @@ -332,20 +331,21 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) { } // Insert into pg with retry. - if err = p.backOff.Exec(p.ctx, func(ctx context.Context) error { - p.logger.Info(query, args) + if err = p.backOff.RetryWithMetrics(p.ctx, func(ctx context.Context) error { rows, err := p.pool.Query(ctx, query, argsSliceInterface...) defer func() { - rows.Close() + if rows != nil { + rows.Close() + } }() if err != nil { - p.logger.Errorf("rows: %v, err: %s", rows, err.Error()) + p.logger.Errorf("can't insert query: %s, args: %v: %s", query, args, err.Error()) return err } return nil }); err != nil { p.pool.Close() - p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err) + p.logger.Fatalf("can't insert. query: %s, args: %v: %s", query, args, err.Error()) } } diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 20fa6eaf4..44e838b17 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -10,8 +10,7 @@ import ( "github.com/golang/mock/gomock" "github.com/jackc/pgconn" "github.com/jackc/pgproto3/v2" - "github.com/ozontech/file.d/consts" - "github.com/ozontech/file.d/expbackoff" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" mock_pg "github.com/ozontech/file.d/plugin/output/postgres/mock" @@ -86,18 +85,18 @@ func TestPrivateOut(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - backOff := expbackoff.New( + backOff := backoff.New( ctx, stats.GetCounter("random_test", "random_test"), time.Second*5, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: 10, }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p := &Plugin{ @@ -179,18 +178,18 @@ func TestPrivateOutWithRetry(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - backOff := expbackoff.New( + backOff := backoff.New( ctx, stats.GetCounter("random_test", "random_test"), time.Second*5, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: 10, }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p := &Plugin{ @@ -252,18 +251,18 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - backOff := expbackoff.New( + backOff := backoff.New( context.Background(), stats.GetCounter("random_test", "random_test"), time.Second*5, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: 10, }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, @@ -347,18 +346,18 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - backOff := expbackoff.New( + backOff := backoff.New( ctx, stats.GetCounter("random_test", "random_test"), time.Second*5, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: 10, }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, @@ -537,18 +536,18 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te builder, err := NewQueryBuilder(columns, table) require.NoError(t, err) - backOff := expbackoff.New( + backOff := backoff.New( context.Background(), stats.GetCounter("random_test", "random_test"), time.Second*5, - expbackoff.RetriesCfg{ + backoff.RetriesCfg{ Limited: true, Limit: 10, }, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p := &Plugin{ config: &config, diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 89bf363fa..d4f5ab736 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -10,9 +10,8 @@ import ( "net/http" "time" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/cfg" - "github.com/ozontech/file.d/consts" - "github.com/ozontech/file.d/expbackoff" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" @@ -39,7 +38,7 @@ type Plugin struct { avgEventSize int batcher *pipeline.Batcher controller pipeline.OutputPluginController - backoff *expbackoff.BackOff + backoff *backoff.BackOff } //! config-params @@ -104,16 +103,16 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.registerPluginMetrics() p.ctx = context.TODO() - p.backoff = expbackoff.New( + p.backoff = backoff.New( p.ctx, stats.GetCounter(subsystemName, sendErrorCounter), // No concrete time limit here. time.Hour, - expbackoff.RetriesCfg{Limited: false}, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.RetriesCfg{Limited: false}, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ) p.batcher = pipeline.NewBatcher( params.PipelineName, @@ -172,7 +171,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { p.logger.Debugf("Trying to send: %s", outBuf) - _ = p.backoff.Exec(p.ctx, func(ctx context.Context) error { + _ = p.backoff.RetryWithMetrics(p.ctx, func(ctx context.Context) error { sendErr := p.send(outBuf) if sendErr != nil { p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error()) diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go index 1d0f24e3d..10cec0f63 100644 --- a/plugin/output/splunk/splunk_test.go +++ b/plugin/output/splunk/splunk_test.go @@ -8,8 +8,7 @@ import ( "testing" "time" - "github.com/ozontech/file.d/consts" - "github.com/ozontech/file.d/expbackoff" + "github.com/ozontech/file.d/backoff" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/stats" "github.com/stretchr/testify/assert" @@ -57,15 +56,15 @@ func TestSplunk(t *testing.T) { Endpoint: testServer.URL, }, logger: zap.NewExample().Sugar(), - backoff: expbackoff.New( + backoff: backoff.New( ctx, stats.GetCounter(subsystemName, sendErrorCounter), time.Minute, - expbackoff.RetriesCfg{Limited: false}, - expbackoff.Multiplier(consts.ExpBackoffDefaultMultiplier), - expbackoff.RandomizationFactor(consts.ExpBackoffDefaultRndFactor), - expbackoff.InitialIntervalOpt(time.Second), - expbackoff.MaxInterval(time.Second*2), + backoff.RetriesCfg{Limited: false}, + backoff.Multiplier(backoff.ExpBackoffDefaultMultiplier), + backoff.RandomizationFactor(backoff.ExpBackoffDefaultRndFactor), + backoff.InitialIntervalOpt(time.Second), + backoff.MaxInterval(time.Second*2), ), } From 9a812737733e17816b548008065e5b85eb3d2588 Mon Sep 17 00:00:00 2001 From: ansakharov Date: Tue, 28 Jun 2022 02:17:04 +0300 Subject: [PATCH 5/5] fix PR during review --- backoff/backoff.go | 3 --- plugin/output/postgres/postgres.go | 8 ++++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/backoff/backoff.go b/backoff/backoff.go index 42c39b98e..d264edc18 100644 --- a/backoff/backoff.go +++ b/backoff/backoff.go @@ -2,7 +2,6 @@ package backoff import ( "context" - "fmt" "time" "github.com/cenkalti/backoff/v4" @@ -67,10 +66,8 @@ func (b *BackOff) RetryWithMetrics(ctx context.Context, executor func(ctx contex func new(opts ...Option) *backoff.ExponentialBackOff { backoff := backoff.NewExponentialBackOff() for _, opt := range opts { - fmt.Println("op", opt) opt(backoff) } - fmt.Println(backoff) return backoff } diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 868a2d679..884fbfc8c 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -132,8 +132,8 @@ type Config struct { //> @3@4@5@6 //> //> Timeout for DB health check. - HealthCheckPeriod cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //* - HealthCheckPeriod_ time.Duration + HealthCheckInterval cfg.Duration `json:"db_health_check_interval" default:"60s" parse:"duration"` //* + HealthCheckInterval_ time.Duration //> @3@4@5@6 //> @@ -203,7 +203,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP if p.config.RequestTimeout_ < 1 { p.logger.Fatal("'db_request_timeout' can't be <1") } - if p.config.HealthCheckPeriod_ < 1 { + if p.config.HealthCheckInterval_ < 1 { p.logger.Fatal("'db_health_check_period' can't be <1") } @@ -412,7 +412,7 @@ func (p *Plugin) parsePGConfig() (*pgxpool.Config, error) { } pgCfg.LazyConnect = false - pgCfg.HealthCheckPeriod = p.config.HealthCheckPeriod_ + pgCfg.HealthCheckPeriod = p.config.HealthCheckInterval_ return pgCfg, nil }