Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add exponential backoff for pg output plugin #74

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package consts

const (
ExpBackoffDefaultMultiplier = 1.2
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
ExpBackoffDefaultRndFactor = 0.25
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/bitly/go-simplejson v0.5.0
github.com/cenkalti/backoff/v4 v4.1.2
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/ghodss/yaml v1.0.0
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
40 changes: 23 additions & 17 deletions plugin/output/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"time"

sq "github.com/Masterminds/squirrel"
"github.com/cenkalti/backoff/v4"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/consts"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/stats"
Expand Down Expand Up @@ -68,6 +70,7 @@ type Plugin struct {
ctx context.Context
cancelFunc context.CancelFunc

backoff backoff.BackOff
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
queryBuilder PgQueryBuilder
pool PgxIface
}
Expand Down Expand Up @@ -138,6 +141,7 @@ type Config struct {
//> @3@4@5@6
//>
//> Timeout for DB requests in milliseconds.
//> Timeouts can differ due using exponential backoff.
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
DBRequestTimeout cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //*
DBRequestTimeout_ time.Duration

Expand Down Expand Up @@ -241,6 +245,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
p.pool = pool

stdBackoff := backoff.NewExponentialBackOff()
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier
stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor
stdBackoff.InitialInterval = p.config.Retention_
stdBackoff.MaxInterval = p.config.Retention_ * 2

ctxBackoff := backoff.WithContext(stdBackoff, p.ctx)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
expBackoff := backoff.WithMaxRetries(ctxBackoff, uint64(p.config.Retry))

p.backoff = expBackoff

p.batcher = pipeline.NewBatcher(
params.PipelineName,
outPluginType,
Expand Down Expand Up @@ -331,34 +346,25 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
argsSliceInterface[i] = args[i-1]
}

var ctx context.Context
var cancel context.CancelFunc
var outErr error
// Insert into pg with retry.
for i := p.config.Retry; i > 0; i-- {
ctx, cancel = context.WithTimeout(p.ctx, p.config.DBRequestTimeout_)
err = backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(p.ctx, p.config.DBRequestTimeout_)
defer cancel()

p.logger.Info(query, args)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
rows, err := p.pool.Query(ctx, query, argsSliceInterface...)
defer func() {
rows.Close()
}()
if err != nil {
outErr = err
p.logger.Infof("rows: %v, err: %s", rows, err.Error())
cancel()
time.Sleep(p.config.Retention_)
continue
} else {
outErr = nil
break
return err
}
}
cancel()

if outErr != nil {
return nil
}, p.backoff)
if err != nil {
p.pool.Close()
p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, outErr)
p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
53 changes: 48 additions & 5 deletions plugin/output/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/golang/mock/gomock"
"github.com/jackc/pgconn"
"github.com/jackc/pgproto3/v2"
Expand Down Expand Up @@ -61,9 +62,10 @@ func TestPrivateOut(t *testing.T) {

table := "table1"

retryCnt := 0
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
Expand All @@ -83,12 +85,20 @@ func TestPrivateOut(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
uint64(retryCnt),
)

p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
Expand Down Expand Up @@ -133,9 +143,10 @@ func TestPrivateOutWithRetry(t *testing.T) {

table := "table1"

retryCnt := 3
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
Expand All @@ -160,12 +171,20 @@ func TestPrivateOutWithRetry(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)

p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
Expand Down Expand Up @@ -209,18 +228,26 @@ func TestPrivateOutNoGoodEvents(t *testing.T) {

table := "table1"

retryCnt := 0
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
context.Background()),
uint64(retryCnt),
)
p := &Plugin{
config: &config,
queryBuilder: builder,
logger: testLogger,
backoff: retryBackoff,
}

p.registerPluginMetrics()
Expand Down Expand Up @@ -275,9 +302,10 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) {

table := "table1"

retryCnt := 1
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
Expand All @@ -297,12 +325,19 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)
p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
Expand Down Expand Up @@ -449,9 +484,10 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te

table := "table1"

retryCnt := 999
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
Expand All @@ -472,12 +508,19 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)
p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
Expand Down
30 changes: 19 additions & 11 deletions plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"net/http"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/consts"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/stats"
Expand All @@ -36,6 +38,7 @@ type Plugin struct {
avgEventSize int
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
backoff backoff.BackOff
}

//! config-params
Expand Down Expand Up @@ -98,6 +101,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config = config.(*Config)
p.client = p.newClient(p.config.RequestTimeout_)

ctx := context.TODO()
andrewmed marked this conversation as resolved.
Show resolved Hide resolved
stdBackoff := backoff.NewExponentialBackOff()
stdBackoff.Multiplier = consts.ExpBackoffDefaultMultiplier
stdBackoff.RandomizationFactor = consts.ExpBackoffDefaultRndFactor
stdBackoff.InitialInterval = time.Second
stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2
p.backoff = backoff.WithContext(stdBackoff, ctx)

p.registerPluginMetrics()

p.batcher = pipeline.NewBatcher(
Expand All @@ -112,7 +123,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
0,
)

p.batcher.Start(context.TODO())
p.batcher.Start(ctx)
}

func (p *Plugin) registerPluginMetrics() {
Expand Down Expand Up @@ -156,19 +167,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = outBuf

p.logger.Debugf("Trying to send: %s", outBuf)

for {
err := p.send(outBuf)
if err != nil {
_ = backoff.Retry(func() error {
sendErr := p.send(outBuf)
if sendErr != nil {
stats.GetCounter(subsystemName, sendErrorCounter).Inc()
p.logger.Errorf("Can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error())
time.Sleep(time.Second)
p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error())

continue
return sendErr
}

break
}
return nil
}, p.backoff)
p.logger.Debugf("Successfully sent: %s", outBuf)
}

Expand Down
13 changes: 12 additions & 1 deletion plugin/output/splunk/splunk_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package splunk

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ozontech/file.d/pipeline"
"github.com/stretchr/testify/assert"
insaneJSON "github.com/vitkovskii/insane-json"
Expand Down Expand Up @@ -44,11 +47,19 @@ func TestSplunk(t *testing.T) {
}))
defer testServer.Close()

ctx := context.TODO()
stdBackoff := backoff.NewExponentialBackOff()
stdBackoff.Multiplier = 1.2
stdBackoff.RandomizationFactor = 0.25
stdBackoff.InitialInterval = time.Second
stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2

plugin := Plugin{
config: &Config{
Endpoint: testServer.URL,
},
logger: zap.NewExample().Sugar(),
logger: zap.NewExample().Sugar(),
backoff: backoff.WithContext(stdBackoff, ctx),
}

batch := pipeline.Batch{
Expand Down