Skip to content

Commit

Permalink
use fatal_on_failed_insert
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Nov 29, 2023
1 parent 2568acd commit 4377995
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 99 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
github.com/bitly/go-simplejson v0.5.1
github.com/cenkalti/backoff/v3 v3.0.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -47,7 +48,6 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v3 v3.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cilium/ebpf v0.9.1 // indirect
github.com/containerd/cgroups/v3 v3.0.1 // indirect
Expand Down
6 changes: 3 additions & 3 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand Down
6 changes: 3 additions & 3 deletions plugin/output/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ pipelines:
type: http
emulate_mode: "no"
address: ":9200"
actions:
- type: json_decode
field: message
actions:
- type: json_decode
field: message
output:
type: s3
file_config:
Expand Down
10 changes: 5 additions & 5 deletions plugin/output/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ If the strict mode is enabled file.d fails (exit with code 1) in above examples.

<br>

**`retry`** *`uint64`* *`default=10`*
**`retry`** *`int`* *`default=10`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code or skip message (see skip_failed_insert).
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`skip_failed_insert`** *`bool`* *`default=false`*
**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or skip the message
After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>
Expand All @@ -122,7 +122,7 @@ Retention milliseconds for retry to DB.

<br>

**`retention_exponentially_multiplier`** *`float64`* *`default=1`*
**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponentially increase retention beetween retries

Expand Down
25 changes: 11 additions & 14 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code or skip message (see skip_failed_insert).
Retry uint64 `json:"retry" default:"10"` // *
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or skip the message
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
SkipFailedInsert bool `json:"skip_failed_insert" default:"false"` // *
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
Expand All @@ -210,7 +210,7 @@ type Config struct {
// > @3@4@5@6
// >
// > Multiplier for exponentially increase retention beetween retries
RetentionExponentMultiplier float64 `json:"retention_exponentially_multiplier" default:"1"` // *
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
Expand Down Expand Up @@ -298,9 +298,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.ctx, p.cancelFunc = context.WithCancel(context.Background())

if p.config.Retry < 1 {
p.logger.Fatal("'retry' can't be <1")
}
if p.config.Retention_ < 1 {
p.logger.Fatal("'retention' can't be <1")
}
Expand Down Expand Up @@ -375,8 +372,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.backoff = cfg.GetBackoff(
p.config.Retention_,
p.config.RetentionExponentMultiplier,
p.config.Retry,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
Expand Down Expand Up @@ -477,14 +474,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

if err != nil {
var errLogFunc func(msg string, fields ...zap.Field)
if p.config.SkipFailedInsert {
errLogFunc = p.logger.Error
} else {
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't insert to the table", zap.Error(err),
zap.Uint64("retries", p.config.Retry),
zap.Int("retries", p.config.Retry),
zap.String("table", p.config.Table))
}
}
Expand Down
10 changes: 5 additions & 5 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ Operation type to be used in batch requests. It can be `index` or `create`. Defa
<br>

**`retry`** *`uint64`* *`default=0`*
**`retry`** *`int`* *`default=10`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code or skip message (see skip_failed_insert).
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`skip_failed_insert`** *`bool`* *`default=false`*
**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or skip the message
After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>
Expand All @@ -114,7 +114,7 @@ Retention milliseconds for retry to DB.

<br>

**`retention_exponentially_multiplier`** *`float64`* *`default=1`*
**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponentially increase retention beetween retries

Expand Down
22 changes: 11 additions & 11 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code or skip message (see skip_failed_insert).
Retry uint64 `json:"retry" default:"0"` // *
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or skip the message
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
SkipFailedInsert bool `json:"skip_failed_insert" default:"false"` // *
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
Expand All @@ -163,7 +163,7 @@ type Config struct {
// > @3@4@5@6
// >
// > Multiplier for exponentially increase retention beetween retries
RetentionExponentMultiplier float64 `json:"retention_exponentially_multiplier" default:"1"` // *
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
}

type data struct {
Expand Down Expand Up @@ -247,8 +247,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.backoff = cfg.GetBackoff(
p.config.Retention_,
p.config.RetentionExponentMultiplier,
p.config.Retry,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.batcher.Start(ctx)
Expand Down Expand Up @@ -298,14 +298,14 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

if err != nil {
var errLogFunc func(args ...interface{})
if p.config.SkipFailedInsert {
errLogFunc = p.logger.Error
} else {
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't send to the elastic", zap.Error(err),
zap.Uint64("retries", p.config.Retry),
zap.Int("retries", p.config.Retry),
)
}
}
Expand Down
13 changes: 10 additions & 3 deletions plugin/output/gelf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,17 @@ After this timeout the batch will be sent even if batch isn't completed.

<br>

**`retry`** *`uint64`* *`default=0`*
**`retry`** *`int`* *`default=0`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code.
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>

Expand All @@ -131,7 +138,7 @@ Retention milliseconds for retry to DB.

<br>

**`retention_exponentially_multiplier`** *`float64`* *`default=1`*
**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponentially increase retention beetween retries

Expand Down
27 changes: 20 additions & 7 deletions plugin/output/gelf/gelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,14 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code.
Retry uint64 `json:"retry" default:"0"` // *
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"0"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
Expand All @@ -162,7 +168,7 @@ type Config struct {
// > @3@4@5@6
// >
// > Multiplier for exponentially increase retention beetween retries
RetentionExponentMultiplier float64 `json:"retention_exponentially_multiplier" default:"1"` // *
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// fields converted to extra fields GELF format
hostField string
Expand Down Expand Up @@ -200,8 +206,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.backoff = cfg.GetBackoff(
p.config.Retention_,
p.config.RetentionExponentMultiplier,
p.config.Retry,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.config.hostField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.HostField))
Expand Down Expand Up @@ -300,8 +306,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}, p.backoff)

if err != nil {
p.logger.Fatal("can't send to gelf", zap.Error(err),
zap.Uint64("retries", p.config.Retry),
var errLogFunc func(args ...interface{})
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't send to gelf", zap.Error(err),
zap.Int("retries", p.config.Retry),
)
}
}
Expand Down
13 changes: 10 additions & 3 deletions plugin/output/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ After this timeout the batch will be sent even if batch isn't full.

<br>

**`retry`** *`uint64`* *`default=10`*
**`retry`** *`int`* *`default=10`*

Retries of insertion. If File.d cannot insert for this number of attempts,
File.d will fall with non-zero exit code.
File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).

<br>

**`fatal_on_failed_insert`** *`bool`* *`default=false`*

After an insert error, fall with a non-zero exit code or not
**Experimental feature**

<br>

Expand All @@ -64,7 +71,7 @@ Retention milliseconds for retry.

<br>

**`retention_exponentially_multiplier`** *`float64`* *`default=1`*
**`retention_exponentially_multiplier`** *`int`* *`default=2`*

Multiplier for exponentially increase retention beetween retries

Expand Down
27 changes: 20 additions & 7 deletions plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,14 @@ type Config struct {
// > @3@4@5@6
// >
// > Retries of insertion. If File.d cannot insert for this number of attempts,
// > File.d will fall with non-zero exit code.
Retry uint64 `json:"retry" default:"10"` // *
// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
Retry int `json:"retry" default:"10"` // *

// > @3@4@5@6
// >
// > After an insert error, fall with a non-zero exit code or not
// > **Experimental feature**
FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

// > @3@4@5@6
// >
Expand All @@ -107,7 +113,7 @@ type Config struct {
// > @3@4@5@6
// >
// > Multiplier for exponentially increase retention beetween retries
RetentionExponentMultiplier float64 `json:"retention_exponentially_multiplier" default:"1"` // *
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
Expand Down Expand Up @@ -169,8 +175,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
}
p.backoff = cfg.GetBackoff(
p.config.Retention_,
p.config.RetentionExponentMultiplier,
p.config.Retry,
float64(p.config.RetentionExponentMultiplier),
uint64(p.config.Retry),
)

p.logger.Infof("workers count=%d, batch size=%d", p.config.WorkersCount_, p.config.BatchSize_)
Expand Down Expand Up @@ -258,8 +264,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}, p.backoff)

if err != nil {
p.logger.Error("can't write batch", zap.Error(err),
zap.Uint64("retries", p.config.Retry))
var errLogFunc func(args ...interface{})
if p.config.FatalOnFailedInsert {
errLogFunc = p.logger.Fatal
} else {
errLogFunc = p.logger.Error
}

errLogFunc("can't write batch", zap.Error(err),
zap.Int("retries", p.config.Retry))
}
}

Expand Down
Loading

0 comments on commit 4377995

Please sign in to comment.