diff --git a/go.mod b/go.mod
index 87ec647af..590f36e84 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
+ github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-redis/redis v6.15.9+incompatible
diff --git a/go.sum b/go.sum
index aeba12cfb..5312332e4 100644
--- a/go.sum
+++ b/go.sum
@@ -33,6 +33,8 @@ github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pg
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
+github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
+github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
diff --git a/pipeline/backoff.go b/pipeline/backoff.go
new file mode 100644
index 000000000..af3401366
--- /dev/null
+++ b/pipeline/backoff.go
@@ -0,0 +1,76 @@
+package pipeline
+
+import (
+ "context"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+)
+
+type RetriableBatcher struct {
+ outFn RetriableBatcherOutFn
+ backoff backoff.BackOff
+ batcher *Batcher
+ onRetryError func(err error)
+}
+
+type RetriableBatcherOutFn func(*WorkerData, *Batch) error
+
+type BackoffOpts struct {
+ MinRetention time.Duration
+ Multiplier float64
+ AttemptNum uint64
+}
+
+func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher {
+ boff := GetBackoff(
+ opts.MinRetention,
+ opts.Multiplier,
+ opts.AttemptNum,
+ )
+
+ batcherBackoff := &RetriableBatcher{
+ outFn: batcherOutFn,
+ backoff: boff,
+ onRetryError: onError,
+ }
+ batcherBackoff.setBatcher(batcherOpts)
+ return batcherBackoff
+}
+
+func (b *RetriableBatcher) setBatcher(batcherOpts *BatcherOptions) {
+ batcherOpts.OutFn = b.Out
+ b.batcher = NewBatcher(*batcherOpts)
+}
+
+func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
+ b.backoff.Reset()
+
+ err := backoff.Retry(func() error {
+ return b.outFn(data, batch)
+ }, b.backoff)
+
+ if err != nil {
+ b.onRetryError(err)
+ }
+}
+
+func (b *RetriableBatcher) Start(ctx context.Context) {
+ b.batcher.Start(ctx)
+}
+
+func (b *RetriableBatcher) Stop() {
+ b.batcher.Stop()
+}
+
+func (b *RetriableBatcher) Add(event *Event) {
+ b.batcher.Add(event)
+}
+
+func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff {
+ expBackoff := backoff.NewExponentialBackOff()
+ expBackoff.InitialInterval = minRetention
+ expBackoff.Multiplier = multiplier
+ expBackoff.RandomizationFactor = 0.5
+ return backoff.WithMaxRetries(expBackoff, attemptNum)
+}
diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go
new file mode 100644
index 000000000..1d73a2bf1
--- /dev/null
+++ b/pipeline/backoff_test.go
@@ -0,0 +1,62 @@
+package pipeline
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/ozontech/file.d/metric"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/atomic"
+)
+
+func TestBackoff(t *testing.T) {
+ errorCount := &atomic.Int32{}
+ errorCountBefore := errorCount.Load()
+
+ eventCount := &atomic.Int32{}
+ eventCountBefore := eventCount.Load()
+
+ errorFn := func(err error) {
+ errorCount.Inc()
+ }
+
+ batcherBackoff := NewRetriableBatcher(
+ &BatcherOptions{
+ MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
+ },
+ func(workerData *WorkerData, batch *Batch) error {
+ eventCount.Inc()
+ return nil
+ },
+ BackoffOpts{AttemptNum: 3},
+ errorFn,
+ )
+
+ batcherBackoff.Out(nil, nil)
+
+ assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count")
+ assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count")
+}
+
+func TestBackoffWithError(t *testing.T) {
+ errorCount := &atomic.Int32{}
+ prevValue := errorCount.Load()
+ errorFn := func(err error) {
+ errorCount.Inc()
+ }
+
+ batcherBackoff := NewRetriableBatcher(
+ &BatcherOptions{
+ MetricCtl: metric.NewCtl("", prometheus.NewRegistry()),
+ },
+ func(workerData *WorkerData, batch *Batch) error {
+ return errors.New("some error")
+ },
+ BackoffOpts{AttemptNum: 3},
+ errorFn,
+ )
+
+ batcherBackoff.Out(nil, nil)
+ assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count")
+}
diff --git a/plugin/README.md b/plugin/README.md
index 3f8e0e06d..822431a6f 100755
--- a/plugin/README.md
+++ b/plugin/README.md
@@ -733,9 +733,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
file_config:
@@ -759,12 +759,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
- file_plugin:
+ file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
diff --git a/plugin/output/README.md b/plugin/output/README.md
index daff2a16b..9b05cc4ef 100755
--- a/plugin/output/README.md
+++ b/plugin/output/README.md
@@ -72,9 +72,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
file_config:
@@ -98,12 +98,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
- file_plugin:
+ file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md
index cc07b5180..a0a7ce227 100644
--- a/plugin/output/clickhouse/README.md
+++ b/plugin/output/clickhouse/README.md
@@ -111,7 +111,14 @@ If the strict mode is enabled file.d fails (exit with code 1) in above examples.
**`retry`** *`int`* *`default=10`*
Retries of insertion. If File.d cannot insert for this number of attempts,
-File.d will fall with non-zero exit code.
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
@@ -128,6 +135,12 @@ Retention milliseconds for retry to DB.
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
**`insert_timeout`** *`cfg.Duration`* *`default=10s`*
Timeout for each insert request.
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go
index 3e82a10b5..25ef5e8ac 100644
--- a/plugin/output/clickhouse/clickhouse.go
+++ b/plugin/output/clickhouse/clickhouse.go
@@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -43,7 +44,7 @@ type Plugin struct {
logger *zap.Logger
config *Config
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
ctx context.Context
cancelFunc context.CancelFunc
@@ -227,9 +228,15 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
- // > File.d will fall with non-zero exit code.
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
// > @3@4@5@6
// >
// > Additional settings to the Clickhouse.
@@ -242,6 +249,11 @@ type Config struct {
Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
Retention_ time.Duration
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
+
// > @3@4@5@6
// >
// > Timeout for each insert request.
@@ -328,9 +340,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.ctx, p.cancelFunc = context.WithCancel(context.Background())
- if p.config.Retry < 1 {
- p.logger.Fatal("'retry' can't be <1")
- }
if p.config.Retention_ < 1 {
p.logger.Fatal("'retention' can't be <1")
}
@@ -405,17 +414,42 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
}
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
Controller: params.Controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Log(level, "can't insert to the table", zap.Error(err),
+ zap.Int("retries", p.config.Retry),
+ zap.String("table", p.config.Table))
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
p.batcher.Start(p.ctx)
}
@@ -443,7 +477,7 @@ func (d data) reset() {
}
}
-func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns)
@@ -484,22 +518,23 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
})
var err error
- for try := 0; try < p.config.Retry; try++ {
+ for i := range p.instances {
requestID := p.requestID.Inc()
- clickhouse := p.getInstance(requestID, try)
- err = p.do(clickhouse, data.input)
+ clickhouse := p.getInstance(requestID, i)
+ err := p.do(clickhouse, data.input)
if err == nil {
- break
+ return nil
}
- p.insertErrorsMetric.Inc()
- time.Sleep(p.config.Retention_)
- p.logger.Error("an attempt to insert a batch failed", zap.Error(err))
}
if err != nil {
- p.logger.Fatal("can't insert to the table", zap.Error(err),
- zap.Int("retries", p.config.Retry),
- zap.String("table", p.config.Table))
+ p.insertErrorsMetric.Inc()
+ p.logger.Error(
+ "an attempt to insert a batch failed",
+ zap.Error(err),
+ )
}
+
+ return err
}
func (p *Plugin) do(clickhouse Clickhouse, queryInput proto.Input) error {
diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md
index a7908aa47..b8587e752 100755
--- a/plugin/output/elasticsearch/README.md
+++ b/plugin/output/elasticsearch/README.md
@@ -94,5 +94,31 @@ Operation type to be used in batch requests. It can be `index` or `create`. Defa
+**`retry`** *`int`* *`default=10`*
+
+Retries of insertion. If File.d cannot insert for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
+
+
+
+**`retention`** *`cfg.Duration`* *`default=1s`*
+
+Retention milliseconds for retry to DB.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go
index 3dd9e9cc3..4553cf59b 100644
--- a/plugin/output/elasticsearch/elasticsearch.go
+++ b/plugin/output/elasticsearch/elasticsearch.go
@@ -19,6 +19,7 @@ import (
"github.com/valyala/fasthttp"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -29,7 +30,6 @@ If a network error occurs, the batch will infinitely try to be delivered to the
const (
outPluginType = "elasticsearch"
NDJSONContentType = "application/x-ndjson"
- retryDelay = time.Second
)
var (
@@ -46,7 +46,7 @@ type Plugin struct {
avgEventSize int
time string
headerPrefix string
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
controller pipeline.OutputPluginController
mu *sync.Mutex
@@ -139,6 +139,29 @@ type Config struct {
// > Operation type to be used in batch requests. It can be `index` or `create`. Default is `index`.
// > > Check out [_bulk API doc](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) for details.
BatchOpType string `json:"batch_op_type" default:"index" options:"index|create"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retries of insertion. If File.d cannot insert for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry to DB.
+ Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
}
type data struct {
@@ -203,10 +226,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.maintenance(nil)
p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_))
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
@@ -215,7 +238,33 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: time.Minute,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Log(level, "can't send to the elastic", zap.Error(err),
+ zap.Int("retries", p.config.Retry),
+ )
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
@@ -237,7 +286,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error", "Number of elasticsearch indexing errors")
}
-func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
@@ -255,14 +304,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = p.appendEvent(data.outBuf, event)
})
- for {
- if err := p.send(data.outBuf); err != nil {
- p.sendErrorMetric.Inc()
- p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
- } else {
- break
- }
+ err := p.send(data.outBuf)
+ if err != nil {
+ p.sendErrorMetric.Inc()
+ p.logger.Error("can't send to the elastic, will try other endpoint", zap.Error(err))
}
+ return err
}
func (p *Plugin) send(body []byte) error {
@@ -279,14 +326,12 @@ func (p *Plugin) send(body []byte) error {
p.setAuthHeader(req)
if err := p.client.DoTimeout(req, resp, p.config.ConnectionTimeout_); err != nil {
- time.Sleep(retryDelay)
return fmt.Errorf("can't send batch to %s: %s", endpoint.String(), err.Error())
}
respContent := resp.Body()
if statusCode := resp.Header.StatusCode(); statusCode < http.StatusOK || statusCode > http.StatusAccepted {
- time.Sleep(retryDelay)
return fmt.Errorf("response status from %s isn't OK: status=%d, body=%s", endpoint.String(), statusCode, string(respContent))
}
diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md
index e3073a300..41ff37bc6 100755
--- a/plugin/output/gelf/README.md
+++ b/plugin/output/gelf/README.md
@@ -118,5 +118,31 @@ After this timeout the batch will be sent even if batch isn't completed.
+**`retry`** *`int`* *`default=0`*
+
+Retries of insertion. If File.d cannot insert for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
+
+
+
+**`retention`** *`cfg.Duration`* *`default=1s`*
+
+Retention milliseconds for retry to DB.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go
index 1abf1fdbd..95757151a 100644
--- a/plugin/output/gelf/gelf.go
+++ b/plugin/output/gelf/gelf.go
@@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -41,7 +42,7 @@ type Plugin struct {
config *Config
logger *zap.SugaredLogger
avgEventSize int
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
controller pipeline.OutputPluginController
// plugin metrics
@@ -144,6 +145,29 @@ type Config struct {
BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
BatchFlushTimeout_ time.Duration
+ // > @3@4@5@6
+ // >
+ // > Retries of insertion. If File.d cannot insert for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"0"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry to DB.
+ Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
+
// fields converted to extra fields GELF format
hostField string
shortMessageField string
@@ -190,10 +214,9 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.config.timestampFieldFormat = format
p.config.levelField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.LevelField))
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
@@ -202,7 +225,33 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: p.config.ReconnectInterval_,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Desugar().Log(level, "can't send to gelf", zap.Error(err),
+ zap.Int("retries", p.config.Retry),
+ )
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
p.batcher.Start(context.TODO())
}
@@ -219,7 +268,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounter("output_gelf_send_error", "Total GELF send errors")
}
-func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
@@ -245,32 +294,30 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data.outBuf = outBuf
data.encodeBuf = encodeBuf
- for {
- if data.gelf == nil {
- p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint)
-
- gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil)
- if err != nil {
- p.sendErrorMetric.Inc()
- p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error())
- time.Sleep(time.Second)
- continue
- }
- data.gelf = gelf
- }
+ if data.gelf == nil {
+ p.logger.Infof("connecting to gelf address=%s", p.config.Endpoint)
- _, err := data.gelf.send(outBuf)
+ gelf, err := newClient(p.config.Endpoint, p.config.ConnectionTimeout_, p.config.WriteTimeout_, false, nil)
if err != nil {
p.sendErrorMetric.Inc()
- p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error())
- _ = data.gelf.close()
- data.gelf = nil
+ p.logger.Errorf("can't connect to gelf endpoint address=%s: %s", p.config.Endpoint, err.Error())
time.Sleep(time.Second)
- continue
+ return err
}
+ data.gelf = gelf
+ }
- break
+ _, err := data.gelf.send(outBuf)
+ if err != nil {
+ p.sendErrorMetric.Inc()
+ p.logger.Errorf("can't send data to gelf address=%s, err: %s", p.config.Endpoint, err.Error())
+ _ = data.gelf.close()
+ data.gelf = nil
+ time.Sleep(time.Second)
+ return err
}
+
+ return nil
}
func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {
diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md
index 279784df5..0dc0e3e05 100755
--- a/plugin/output/kafka/README.md
+++ b/plugin/output/kafka/README.md
@@ -57,6 +57,32 @@ After this timeout the batch will be sent even if batch isn't full.
+**`retry`** *`int`* *`default=10`*
+
+Retries of insertion. If File.d cannot insert for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
+
+
+
+**`retention`** *`cfg.Duration`* *`default=50ms`*
+
+Retention milliseconds for retry.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
**`is_sasl_enabled`** *`bool`* *`default=false`*
If set, the plugin will use SASL authentications mechanism.
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go
index a1c2c0c7a..ef880beb8 100644
--- a/plugin/output/kafka/kafka.go
+++ b/plugin/output/kafka/kafka.go
@@ -14,6 +14,7 @@ import (
"github.com/ozontech/file.d/xtls"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -36,7 +37,7 @@ type Plugin struct {
controller pipeline.OutputPluginController
producer sarama.SyncProducer
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
// plugin metrics
sendErrorMetric prometheus.Counter
@@ -95,6 +96,29 @@ type Config struct {
BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
BatchFlushTimeout_ time.Duration
+ // > @3@4@5@6
+ // >
+ // > Retries of insertion. If File.d cannot insert for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry.
+ Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
+
// > @3@4@5@6
// >
// > If set, the plugin will use SASL authentications mechanism.
@@ -149,20 +173,50 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.controller = params.Controller
p.registerMetrics(params.MetricCtl)
+ if p.config.Retention_ < 1 {
+ p.logger.Fatal("'retention' can't be <1")
+ }
+
p.logger.Infof("workers count=%d, batch size=%d", p.config.WorkersCount_, p.config.BatchSize_)
p.producer = NewProducer(p.config, p.logger)
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Desugar().Log(level, "can't write batch",
+ zap.Int("retries", p.config.Retry),
+ )
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
p.batcher.Start(context.TODO())
}
@@ -175,7 +229,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounter("output_kafka_send_errors", "Total Kafka send errors")
}
-func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
messages: make([]*sarama.ProducerMessage, p.config.BatchSize_),
@@ -211,8 +265,6 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
i++
})
- data.outBuf = outBuf
-
err := p.producer.SendMessages(data.messages[:i])
if err != nil {
errs := err.(sarama.ProducerErrors)
@@ -220,8 +272,13 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
p.logger.Errorf("can't write batch: %s", e.Err.Error())
}
p.sendErrorMetric.Add(float64(len(errs)))
- p.controller.Error("some events from batch were not written")
+ p.logger.Error(
+ "an attempt to insert a batch failed",
+ zap.Error(err),
+ )
}
+
+ return err
}
func (p *Plugin) Stop() {
diff --git a/plugin/output/postgres/README.idoc.md b/plugin/output/postgres/README.idoc.md
index 2037fba64..657419cef 100644
--- a/plugin/output/postgres/README.idoc.md
+++ b/plugin/output/postgres/README.idoc.md
@@ -3,3 +3,6 @@
### Config params
@config-params|description
+
+### Example
+@example
diff --git a/plugin/output/postgres/README.md b/plugin/output/postgres/README.md
index bb14ce874..9bfd08881 100755
--- a/plugin/output/postgres/README.md
+++ b/plugin/output/postgres/README.md
@@ -38,9 +38,17 @@ and nullable options.
-**`retry`** *`int`* *`default=3`*
+**`retry`** *`int`* *`default=10`*
-Retries of insertion.
+Retries of insertion. If File.d cannot insert for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
@@ -52,6 +60,9 @@ Retention milliseconds for retry to DB.
**`db_request_timeout`** *`cfg.Duration`* *`default=3000ms`*
+Multiplier for exponential increase of retention between retries
+*`cfg.Duration`* *`default=3000ms`*
+
Timeout for DB requests in milliseconds.
@@ -91,4 +102,37 @@ After this timeout batch will be sent even if batch isn't completed.
+### Example
+**Example**
+Postgres output example:
+```yaml
+pipelines:
+ example_pipeline:
+ input:
+ type: file
+ persistence_mode: async
+ watching_dir: ./
+ filename_pattern: input_example.json
+ offsets_file: ./offsets.yaml
+ offsets_op: reset
+ output:
+ type: postgres
+ conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10"
+ table: events
+ columns:
+ - name: id
+ type: int
+ - name: name
+ type: string
+ retry: 10
+ retention: 1s
+ retention_exponentially_multiplier: 1.5
+```
+
+input_example.json
+```json
+{"id":1,"name":"name1"}
+{"id":2,"name":"name2"}
+```
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go
index a793e577e..531396b75 100644
--- a/plugin/output/postgres/postgres.go
+++ b/plugin/output/postgres/postgres.go
@@ -17,12 +17,47 @@ import (
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
It sends the event batches to postgres db using pgx.
}*/
+/*{ example
+**Example**
+Postgres output example:
+```yaml
+pipelines:
+ example_pipeline:
+ input:
+ type: file
+ persistence_mode: async
+ watching_dir: ./
+ filename_pattern: input_example.json
+ offsets_file: ./offsets.yaml
+ offsets_op: reset
+ output:
+ type: postgres
+ conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10"
+ table: events
+ columns:
+ - name: id
+ type: int
+ - name: name
+ type: string
+ retry: 10
+ retention: 1s
+ retention_exponentially_multiplier: 1.5
+```
+
+input_example.json
+```json
+{"id":1,"name":"name1"}
+{"id":2,"name":"name2"}
+```
+}*/
+
var (
ErrEventDoesntHaveField = errors.New("event doesn't have field")
ErrEventFieldHasWrongType = errors.New("event field has wrong type")
@@ -63,7 +98,7 @@ type Plugin struct {
controller pipeline.OutputPluginController
logger *zap.SugaredLogger
config *Config
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
ctx context.Context
cancelFunc context.CancelFunc
@@ -71,9 +106,11 @@ type Plugin struct {
pool PgxIface
// plugin metrics
+
discardedEventMetric prometheus.Counter
duplicatedEventMetric prometheus.Counter
writtenEventMetric prometheus.Counter
+ insertErrorsMetric prometheus.Counter
}
type ConfigColumn struct {
@@ -119,8 +156,15 @@ type Config struct {
// > @3@4@5@6
// >
- // > Retries of insertion.
- Retry int `json:"retry" default:"3"` // *
+ // > Retries of insertion. If File.d cannot insert for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
// > @3@4@5@6
// >
@@ -128,6 +172,11 @@ type Config struct {
Retention cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
Retention_ time.Duration
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"`
+
// > @3@4@5@6
// >
// > Timeout for DB requests in milliseconds.
@@ -182,6 +231,7 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.discardedEventMetric = ctl.RegisterCounter("output_postgres_event_discarded", "Total pgsql discarded messages")
p.duplicatedEventMetric = ctl.RegisterCounter("output_postgres_event_duplicated", "Total pgsql duplicated messages")
p.writtenEventMetric = ctl.RegisterCounter("output_postgres_event_written", "Total events written to pgsql")
+ p.insertErrorsMetric = ctl.RegisterCounter("output_postgres_insert_errors", "Total pgsql insert errors")
}
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) {
@@ -192,7 +242,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
if len(p.config.Columns) == 0 {
- p.logger.Fatal("can't start plugin, no fields in config")
+ p.logger.Fatal("can't start plugin, no columns in config")
}
if p.config.Retry < 1 {
@@ -225,17 +275,42 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
p.pool = pool
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Desugar().Log(level, "can't insert to the table", zap.Error(err),
+ zap.Int("retries", p.config.Retry),
+ zap.String("table", p.config.Table))
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
ctx, cancel := context.WithCancel(context.Background())
p.ctx = ctx
@@ -254,7 +329,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}
-func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) error {
// _ *pipeline.WorkerData - doesn't required in this plugin, we can't parse
// events for uniques through bytes.
builder := p.queryBuilder.GetInsertBuilder()
@@ -299,7 +374,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
// no valid events passed.
if !anyValidValue {
- return
+ return nil
}
query, args, err := builder.ToSql()
@@ -314,22 +389,14 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
argsSliceInterface[i] = args[i-1]
}
- // Insert into pg with retry.
- for i := p.config.Retry; i > 0; i-- {
- err = p.try(query, argsSliceInterface)
- if err != nil {
- p.logger.Errorf("can't exec query: %s", err.Error())
- time.Sleep(p.config.Retention_)
- continue
- }
- p.writtenEventMetric.Add(float64(len(uniqueEventsMap)))
- break
- }
-
+ err = p.try(query, argsSliceInterface)
if err != nil {
- p.pool.Close()
- p.logger.Fatalf("failed insert into %s. query: %s, args: %v, err: %v", p.config.Table, query, args, err)
+ p.insertErrorsMetric.Inc()
+ p.logger.Errorf("can't exec query: %s", err.Error())
+ return err
}
+ p.writtenEventMetric.Add(float64(len(uniqueEventsMap)))
+ return nil
}
func (p *Plugin) try(query string, argsSliceInterface []any) error {
diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go
index 522c874be..53d905ddd 100644
--- a/plugin/output/postgres/postgres_test.go
+++ b/plugin/output/postgres/postgres_test.go
@@ -147,12 +147,7 @@ func TestPrivateOutWithRetry(t *testing.T) {
gomock.AssignableToTypeOf(ctxMock),
"INSERT INTO table1 (str_uni_1,int_1,timestamp_1) VALUES ($1,$2,$3) ON CONFLICT(str_uni_1) DO UPDATE SET int_1=EXCLUDED.int_1,timestamp_1=EXCLUDED.timestamp_1",
[]any{preferSimpleProtocol, strUniValue, intValue, time.Unix(int64(timestampValue), 0).Format(time.RFC3339)},
- ).Return(&rowsForTest{}, errors.New("someError")).Times(2)
- mockpool.EXPECT().Query(
- gomock.AssignableToTypeOf(ctxMock),
- "INSERT INTO table1 (str_uni_1,int_1,timestamp_1) VALUES ($1,$2,$3) ON CONFLICT(str_uni_1) DO UPDATE SET int_1=EXCLUDED.int_1,timestamp_1=EXCLUDED.timestamp_1",
- []any{preferSimpleProtocol, strUniValue, intValue, time.Unix(int64(timestampValue), 0).Format(time.RFC3339)},
- ).Return(&rowsForTest{}, nil).Times(1)
+ ).Return(&rowsForTest{}, errors.New("someError")).Times(1)
builder, err := NewQueryBuilder(columns, table)
require.NoError(t, err)
diff --git a/plugin/output/s3/README.md b/plugin/output/s3/README.md
index 53bef4194..5b914e248 100755
--- a/plugin/output/s3/README.md
+++ b/plugin/output/s3/README.md
@@ -22,9 +22,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
file_config:
@@ -48,12 +48,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
- file_plugin:
+ file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
@@ -144,4 +144,30 @@ Sets upload timeout.
+**`retry`** *`int`* *`default=10`*
+
+Retries of upload. If File.d cannot upload for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
+
+
+
+**`retention`** *`cfg.Duration`* *`default=1s`*
+
+Retention milliseconds for retry to upload.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/s3/TESTME.md b/plugin/output/s3/TESTME.md
index c57992e3b..92389406c 100644
--- a/plugin/output/s3/TESTME.md
+++ b/plugin/output/s3/TESTME.md
@@ -52,12 +52,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
- file_plugin:
+ file_config:
retention_interval: 10s
endpoint: "0.0.0.0:19001"
access_key: "minio_access_key"
diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go
index b8d0050b5..84148db68 100644
--- a/plugin/output/s3/s3.go
+++ b/plugin/output/s3/s3.go
@@ -13,6 +13,7 @@ import (
"sync"
"time"
+ "github.com/cenkalti/backoff/v4"
"github.com/minio/minio-go"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
@@ -21,6 +22,7 @@ import (
"github.com/ozontech/file.d/plugin/output/file"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -47,9 +49,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
file_config:
@@ -73,12 +75,12 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
- actions:
- - type: json_decode
- field: message
+ actions:
+ - type: json_decode
+ field: message
output:
type: s3
- file_plugin:
+ file_config:
retention_interval: 10s
# endpoint, access_key, secret_key, bucket are required.
endpoint: "s3.fake_host.org:80"
@@ -102,16 +104,14 @@ pipelines:
}*/
const (
- fileNameSeparator = "_"
- attemptIntervalMin = 1 * time.Second
- dirSep = "/"
- StaticBucketDir = "static_buckets"
- DynamicBucketDir = "dynamic_buckets"
+ fileNameSeparator = "_"
+ dirSep = "/"
+ StaticBucketDir = "static_buckets"
+ DynamicBucketDir = "dynamic_buckets"
)
var (
- attemptInterval = attemptIntervalMin
- compressors = map[string]func(*zap.SugaredLogger) compressor{
+ compressors = map[string]func(*zap.SugaredLogger) compressor{
zipName: newZipCompressor,
}
)
@@ -234,6 +234,29 @@ type Config struct {
// > Sets upload timeout.
UploadTimeout cfg.Duration `json:"upload_timeout" default:"1m" parse:"duration"` // *
UploadTimeout_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Retries of upload. If File.d cannot upload for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry to upload.
+ Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
}
func (c *Config) IsMultiBucketExists(bucketName string) bool {
@@ -313,7 +336,11 @@ func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.Outp
p.compressCh = make(chan fileDTO, p.config.FileConfig.WorkersCount_)
for i := 0; i < p.config.FileConfig.WorkersCount_; i++ {
- go p.uploadWork()
+ go p.uploadWork(pipeline.GetBackoff(
+ p.config.Retention_,
+ float64(p.config.RetentionExponentMultiplier),
+ uint64(p.config.Retry),
+ ))
go p.compressWork()
}
err = p.startPlugins(params, outPlugCount, targetDirs, fileNames)
@@ -505,10 +532,10 @@ func (p *Plugin) addFileJobWithBucket(bucketName string) func(filename string) {
}
}
-func (p *Plugin) uploadWork() {
+func (p *Plugin) uploadWork(workerBackoff backoff.BackOff) {
for compressed := range p.uploadCh {
- sleepTime := attemptInterval
- for {
+ workerBackoff.Reset()
+ err := backoff.Retry(func() error {
p.logger.Infof("starting upload s3 object. fileName=%s, bucketName=%s", compressed.fileName, compressed.bucketName)
err := p.uploadToS3(compressed)
if err == nil {
@@ -519,11 +546,23 @@ func (p *Plugin) uploadWork() {
if err != nil && !os.IsNotExist(err) {
p.logger.Panicf("could not delete file: %s, err: %s", compressed, err.Error())
}
- break
+ return nil
}
- sleepTime += sleepTime
- p.logger.Errorf("could not upload object: %s, next attempt in %s, error: %s", compressed, sleepTime.String(), err.Error())
- time.Sleep(sleepTime)
+ p.logger.Errorf("could not upload object: %s, error: %s", compressed, err.Error())
+ return err
+ }, workerBackoff)
+
+ if err != nil {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Desugar().Log(level, "could not upload s3 object", zap.Error(err),
+ zap.Int("retries", p.config.Retry),
+ )
}
}
}
diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md
index d5583c5f7..5b456845a 100755
--- a/plugin/output/splunk/README.md
+++ b/plugin/output/splunk/README.md
@@ -45,5 +45,31 @@ After this timeout the batch will be sent even if batch isn't completed.
+**`retry`** *`int`* *`default=10`*
+
+Retries of insertion. If File.d cannot insert for this number of attempts,
+File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+
+
+
+**`fatal_on_failed_insert`** *`bool`* *`default=false`*
+
+After an insert error, fall with a non-zero exit code or not
+**Experimental feature**
+
+
+
+**`retention`** *`cfg.Duration`* *`default=1s`*
+
+Retention milliseconds for retry to DB.
+
+
+
+**`retention_exponentially_multiplier`** *`int`* *`default=2`*
+
+Multiplier for exponential increase of retention between retries
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go
index 269d13425..99cc07e8c 100644
--- a/plugin/output/splunk/splunk.go
+++ b/plugin/output/splunk/splunk.go
@@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
/*{ introduction
@@ -32,7 +33,7 @@ type Plugin struct {
client http.Client
logger *zap.SugaredLogger
avgEventSize int
- batcher *pipeline.Batcher
+ batcher *pipeline.RetriableBatcher
controller pipeline.OutputPluginController
// plugin metrics
@@ -82,6 +83,29 @@ type Config struct {
// > After this timeout the batch will be sent even if batch isn't completed.
BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
BatchFlushTimeout_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Retries of insertion. If File.d cannot insert for this number of attempts,
+ // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
+ Retry int `json:"retry" default:"10"` // *
+
+ // > @3@4@5@6
+ // >
+ // > After an insert error, fall with a non-zero exit code or not
+ // > **Experimental feature**
+ FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Retention milliseconds for retry to DB.
+ Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
+ Retention_ time.Duration
+
+ // > @3@4@5@6
+ // >
+ // > Multiplier for exponential increase of retention between retries
+ RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
}
type data struct {
@@ -107,10 +131,9 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.client = p.newClient(p.config.RequestTimeout_)
- p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
+ batcherOpts := pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
- OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
@@ -118,7 +141,32 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MetricCtl: params.MetricCtl,
- })
+ }
+
+ backoffOpts := pipeline.BackoffOpts{
+ MinRetention: p.config.Retention_,
+ Multiplier: float64(p.config.RetentionExponentMultiplier),
+ AttemptNum: uint64(p.config.Retry),
+ }
+
+ onError := func(err error) {
+ var level zapcore.Level
+ if p.config.FatalOnFailedInsert {
+ level = zapcore.FatalLevel
+ } else {
+ level = zapcore.ErrorLevel
+ }
+
+ p.logger.Desugar().Log(level, "can't send data to splunk", zap.Error(err),
+ zap.Int("retries", p.config.Retry))
+ }
+
+ p.batcher = pipeline.NewRetriableBatcher(
+ &batcherOpts,
+ p.out,
+ backoffOpts,
+ onError,
+ )
p.batcher.Start(context.TODO())
}
@@ -135,7 +183,7 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.batcher.Add(event)
}
-func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
+func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error {
if *workerData == nil {
*workerData = &data{
outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize),
@@ -161,19 +209,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
p.logger.Debugf("trying to send: %s", outBuf)
- for {
- err := p.send(outBuf)
- if err != nil {
- p.sendErrorMetric.Inc()
- p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error())
- time.Sleep(time.Second)
-
- continue
- }
-
- break
+ err := p.send(outBuf)
+ if err != nil {
+ p.sendErrorMetric.Inc()
+ p.logger.Errorf("can't send data to splunk address=%s: %s", p.config.Endpoint, err.Error())
+ } else {
+ p.logger.Debugf("successfully sent: %s", outBuf)
}
- p.logger.Debugf("successfully sent: %s", outBuf)
+
+ return err
}
func (p *Plugin) maintenance(_ *pipeline.WorkerData) {}