Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add exponential backoff for pg output plugin #74

Closed
wants to merge 7 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add exponential backoff for pg output plugin
  • Loading branch information
ansakharov committed Apr 26, 2022
commit b863c884c770d4c37713997b83d6c30f989ed715
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION ?= v0.5.3
VERSION ?= v0.5.4
UPSTREAM_BRANCH ?= origin/master

.PHONY: prepare
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d
github.com/bitly/go-simplejson v0.5.0
github.com/cenkalti/backoff/v4 v4.1.2
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/ghodss/yaml v1.0.0
github.com/golang/mock v1.6.0
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -40,6 +40,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
39 changes: 22 additions & 17 deletions plugin/output/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"time"

sq "github.com/Masterminds/squirrel"
"github.com/cenkalti/backoff/v4"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/ozontech/file.d/cfg"
@@ -65,6 +66,7 @@ type Plugin struct {
ctx context.Context
cancelFunc context.CancelFunc

backoff backoff.BackOff
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
queryBuilder PgQueryBuilder
pool PgxIface
}
@@ -238,6 +240,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
p.pool = pool

stdBackoff := backoff.NewExponentialBackOff()
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
stdBackoff.Multiplier = 1.2
stdBackoff.RandomizationFactor = 0.25
stdBackoff.InitialInterval = p.config.Retention_
stdBackoff.MaxInterval = p.config.Retention_ * 2

ctxBackoff := backoff.WithContext(stdBackoff, p.ctx)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
expBackoff := backoff.WithMaxRetries(ctxBackoff, uint64(p.config.Retry))

p.backoff = expBackoff
ansakharov marked this conversation as resolved.
Show resolved Hide resolved

p.batcher = pipeline.NewBatcher(
params.PipelineName,
outPluginType,
@@ -321,31 +334,23 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
p.logger.Fatalf("Invalid SQL. query: %s, args: %v, err: %v", query, args, err)
}

var ctx context.Context
var cancel context.CancelFunc
var outErr error
// Insert into pg with retry.
for i := p.config.Retry; i > 0; i-- {
ctx, cancel = context.WithTimeout(p.ctx, p.config.DBRequestTimeout_)
err = backoff.Retry(func() error {
ctx, cancel := context.WithTimeout(p.ctx, p.config.DBRequestTimeout_)
defer cancel()

p.logger.Info(query, args)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
pgCommResult, err := p.pool.Exec(ctx, query, args...)
if err != nil {
outErr = err
p.logger.Infof("pgCommResult: %v, err: %s", pgCommResult, err.Error())
cancel()
time.Sleep(p.config.Retention_)
continue
} else {
outErr = nil
break
}
}
cancel()

if outErr != nil {
return err
}
return nil
}, p.backoff)
if err != nil {
p.pool.Close()
p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, outErr)
p.logger.Fatalf("Failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err)
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
ansakharov marked this conversation as resolved.
Show resolved Hide resolved
}
}

53 changes: 48 additions & 5 deletions plugin/output/postgres/postgres_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/golang/mock/gomock"
"github.com/jackc/pgconn"
"github.com/ozontech/file.d/logger"
@@ -60,9 +61,10 @@ func TestPrivateOut(t *testing.T) {

table := "table1"

retryCnt := 0
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
@@ -82,12 +84,20 @@ func TestPrivateOut(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
uint64(retryCnt),
)

p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
@@ -132,9 +142,10 @@ func TestPrivateOutWithRetry(t *testing.T) {

table := "table1"

retryCnt := 3
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
@@ -159,12 +170,20 @@ func TestPrivateOutWithRetry(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)

p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
@@ -208,18 +227,26 @@ func TestPrivateOutNoGoodEvents(t *testing.T) {

table := "table1"

retryCnt := 0
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
context.Background()),
uint64(retryCnt),
)
p := &Plugin{
config: &config,
queryBuilder: builder,
logger: testLogger,
backoff: retryBackoff,
}

p.registerPluginMetrics()
@@ -274,9 +301,10 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) {

table := "table1"

retryCnt := 1
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
@@ -296,12 +324,19 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) {
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)
p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
@@ -448,9 +483,10 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te

table := "table1"

retryCnt := 999
config := Config{
Columns: columns,
Retry: 3,
Retry: retryCnt,
}

ctl := gomock.NewController(t)
@@ -471,12 +507,19 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)

retryBackoff := backoff.WithMaxRetries(
backoff.WithContext(
backoff.NewExponentialBackOff(),
ctx),
3,
)
p := &Plugin{
config: &config,
queryBuilder: builder,
pool: pool,
logger: testLogger,
ctx: ctx,
backoff: retryBackoff,
}

p.registerPluginMetrics()
28 changes: 18 additions & 10 deletions plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"net/http"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
@@ -36,6 +37,7 @@ type Plugin struct {
avgEventSize int
batcher *pipeline.Batcher
controller pipeline.OutputPluginController
backoff backoff.BackOff
}

//! config-params
@@ -98,6 +100,14 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config = config.(*Config)
p.client = p.newClient(p.config.RequestTimeout_)

ctx := context.TODO()
andrewmed marked this conversation as resolved.
Show resolved Hide resolved
stdBackoff := backoff.NewExponentialBackOff()
stdBackoff.Multiplier = 1.2
stdBackoff.RandomizationFactor = 0.25
stdBackoff.InitialInterval = time.Second
stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2
p.backoff = backoff.WithContext(stdBackoff, ctx)

ansakharov marked this conversation as resolved.
Show resolved Hide resolved
p.registerPluginMetrics()

p.batcher = pipeline.NewBatcher(
@@ -112,7 +122,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
0,
)

p.batcher.Start(context.TODO())
p.batcher.Start(ctx)
}

func (p *Plugin) registerPluginMetrics() {
@@ -154,18 +164,16 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
insaneJSON.Release(root)
data.outBuf = outBuf

for {
err := p.send(outBuf)
if err != nil {
_ = backoff.Retry(func() error {
sendErr := p.send(outBuf)
if sendErr != nil {
stats.GetCounter(subsystemName, sendErrorCounter).Inc()
p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error())
time.Sleep(time.Second)
p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, sendErr.Error())

continue
return sendErr
}

break
}
return nil
}, p.backoff)
}

func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {}
13 changes: 12 additions & 1 deletion plugin/output/splunk/splunk_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package splunk

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ozontech/file.d/pipeline"
"github.com/stretchr/testify/assert"
insaneJSON "github.com/vitkovskii/insane-json"
@@ -44,11 +47,19 @@ func TestSplunk(t *testing.T) {
}))
defer testServer.Close()

ctx := context.TODO()
stdBackoff := backoff.NewExponentialBackOff()
stdBackoff.Multiplier = 1.2
stdBackoff.RandomizationFactor = 0.25
stdBackoff.InitialInterval = time.Second
stdBackoff.MaxInterval = stdBackoff.InitialInterval * 2

plugin := Plugin{
config: &Config{
Endpoint: testServer.URL,
},
logger: zap.NewExample().Sugar(),
logger: zap.NewExample().Sugar(),
backoff: backoff.WithContext(stdBackoff, ctx),
}

batch := pipeline.Batch{