diff --git a/logger/logger.go b/logger/logger.go
index 1fab0428f..b0556e501 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -3,15 +3,22 @@ package logger
import (
"os"
"strings"
+ "time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var Instance *zap.SugaredLogger
+var SampleInstance *zap.SugaredLogger
var Level zap.AtomicLevel
-const defaultLevel = zap.InfoLevel
+const (
+ defaultLevel = zap.InfoLevel
+ defaultTick = time.Second
+ defaultFirst = 10
+ defaultThereAfter = 100
+)
func init() {
var level zapcore.Level
@@ -32,27 +39,44 @@ func init() {
Level = zap.NewAtomicLevelAt(level)
+ core := zapcore.NewCore(
+ zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
+ TimeKey: "ts",
+ LevelKey: "level",
+ NameKey: "Instance",
+ CallerKey: "caller",
+ MessageKey: "message",
+ StacktraceKey: "stacktrace",
+ LineEnding: zapcore.DefaultLineEnding,
+ EncodeLevel: zapcore.LowercaseLevelEncoder,
+ EncodeTime: zapcore.ISO8601TimeEncoder,
+ EncodeDuration: zapcore.SecondsDurationEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ }),
+ zapcore.AddSync(os.Stdout),
+ Level,
+ )
+
+ sampleCore := zapcore.NewSamplerWithOptions(
+ core,
+ defaultTick,
+ defaultFirst,
+ defaultThereAfter,
+ )
+
+ // logger initialization
Instance = zap.New(
- zapcore.NewCore(
- zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
- TimeKey: "ts",
- LevelKey: "level",
- NameKey: "Instance",
- CallerKey: "caller",
- MessageKey: "message",
- StacktraceKey: "stacktrace",
- LineEnding: zapcore.DefaultLineEnding,
- EncodeLevel: zapcore.LowercaseLevelEncoder,
- EncodeTime: zapcore.ISO8601TimeEncoder,
- EncodeDuration: zapcore.SecondsDurationEncoder,
- EncodeCaller: zapcore.ShortCallerEncoder,
- }),
- zapcore.AddSync(os.Stdout),
- Level,
- ),
+ core,
).Sugar().Named("fd")
Instance.Infof("Logger initialized with level: %s", level)
+
+ // sample logger initialization
+ SampleInstance = zap.New(
+ sampleCore,
+ ).Sugar().Named("fd")
+
+ SampleInstance.Infof("SampleLogger initialized with level: %s", level)
}
func Debug(args ...any) {
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index 52e71871e..9d6935e14 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -99,6 +99,7 @@ type Pipeline struct {
// some debugging stuff
logger *zap.SugaredLogger
+ sampleLogger *zap.SugaredLogger
eventLogEnabled bool
eventLog []string
eventLogMu *sync.Mutex
@@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
pipeline := &Pipeline{
Name: name,
logger: logger.Instance.Named(name),
+ sampleLogger: logger.SampleInstance.Named(name),
settings: settings,
useSpread: false,
disableStreams: false,
@@ -266,7 +268,7 @@ func (p *Pipeline) Start() {
p.logger.Infof("stating processors, count=%d", len(p.Procs))
for _, processor := range p.Procs {
processor.registerMetrics(p.metricsCtl)
- processor.start(p.actionParams, p.logger)
+ processor.start(p.actionParams, p.logger, p.sampleLogger)
}
p.logger.Infof("starting input plugin %q", p.inputInfo.Type)
@@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() {
proc := p.newProc()
p.Procs = append(p.Procs, proc)
proc.registerMetrics(p.metricsCtl)
- proc.start(p.actionParams, p.logger)
+ proc.start(p.actionParams, p.logger, p.sampleLogger)
}
p.procCount.Swap(to)
diff --git a/pipeline/plugin.go b/pipeline/plugin.go
index 36fd88bdd..2afaaa0de 100644
--- a/pipeline/plugin.go
+++ b/pipeline/plugin.go
@@ -52,8 +52,9 @@ type PluginDefaultParams struct {
type ActionPluginParams struct {
*PluginDefaultParams
- Controller ActionPluginController
- Logger *zap.SugaredLogger
+ Controller ActionPluginController
+ Logger *zap.SugaredLogger
+ SampleLogger *zap.SugaredLogger
}
type OutputPluginParams struct {
diff --git a/pipeline/processor.go b/pipeline/processor.go
index ca666350f..2cb9cd947 100644
--- a/pipeline/processor.go
+++ b/pipeline/processor.go
@@ -94,13 +94,14 @@ func NewProcessor(
return processor
}
-func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) {
+func (p *processor) start(params *PluginDefaultParams, defaultLogger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) {
for i, action := range p.actions {
actionInfo := p.actionInfos[i]
action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{
PluginDefaultParams: params,
Controller: p,
- Logger: logger.Named("action").Named(actionInfo.Type),
+ Logger: defaultLogger.Named("action").Named(actionInfo.Type),
+ SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type),
})
}
diff --git a/plugin/action/discard/README.idoc.md b/plugin/action/discard/README.idoc.md
index a41776b0c..f56a628de 100644
--- a/plugin/action/discard/README.idoc.md
+++ b/plugin/action/discard/README.idoc.md
@@ -1,4 +1,5 @@
# Discard plugin
@introduction
-> No config params
+### Config params
+@config-params|description
diff --git a/plugin/action/discard/README.md b/plugin/action/discard/README.md
index d856baba0..acfc92333 100755
--- a/plugin/action/discard/README.md
+++ b/plugin/action/discard/README.md
@@ -13,6 +13,12 @@ pipelines:
...
```
-> No config params
+### Config params
+**`is_logging`** *`bool`* *`default=false`*
+
+Field that includes logging (with sampling).
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/action/discard/discard.go b/plugin/action/discard/discard.go
index cbc156069..0589031d4 100644
--- a/plugin/action/discard/discard.go
+++ b/plugin/action/discard/discard.go
@@ -4,6 +4,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
+ "go.uber.org/zap"
)
/*{ introduction
@@ -23,10 +24,19 @@ pipelines:
}*/
type Plugin struct {
+ config *Config
+ sampleLogger *zap.SugaredLogger
plugin.NoMetricsPlugin
}
-type Config struct{}
+// ! config-params
+// ^ config-params
+type Config struct {
+ // > @3@4@5@6
+ // >
+ // > Field that includes logging (with sampling).
+ IsLogging bool `json:"is_logging" default:"false"` // *
+}
func init() {
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
@@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}
-func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
+func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
+ p.config = config.(*Config)
+ p.sampleLogger = params.SampleLogger
}
func (p *Plugin) Stop() {
}
-func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult {
+func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
+ if p.config.IsLogging {
+ p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event))
+ }
return pipeline.ActionDiscard
}
diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go
index 5e424c2c4..4390b5752 100644
--- a/plugin/action/discard/discard_test.go
+++ b/plugin/action/discard/discard_test.go
@@ -22,7 +22,7 @@ func TestDiscardAnd(t *testing.T) {
},
}
- p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false))
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, false))
wg := &sync.WaitGroup{}
wg.Add(10)
@@ -66,7 +66,7 @@ func TestDiscardOr(t *testing.T) {
},
}
- p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))
wg := &sync.WaitGroup{}
wg.Add(8)
@@ -110,7 +110,7 @@ func TestDiscardRegex(t *testing.T) {
},
}
- p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))
wg := &sync.WaitGroup{}
wg.Add(11)
@@ -151,7 +151,7 @@ func TestDiscardMatchInvert(t *testing.T) {
},
}
- p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true))
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, true))
wg := &sync.WaitGroup{}
wg.Add(9)
diff --git a/plugin/action/mask/mask_test.go b/plugin/action/mask/mask_test.go
index b0572b180..0072d6c4d 100644
--- a/plugin/action/mask/mask_test.go
+++ b/plugin/action/mask/mask_test.go
@@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) {
PipelineName: "test_pipeline",
PipelineSettings: &pipeline.Settings{},
},
- Logger: zap.L().Sugar(),
+ Logger: zap.L().Sugar(),
+ SampleLogger: zap.L().Sugar(),
})
plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re)
diff --git a/plugin/action/parse_re2/README.md b/plugin/action/parse_re2/README.md
index c4ab67c7a..25c919fc9 100755
--- a/plugin/action/parse_re2/README.md
+++ b/plugin/action/parse_re2/README.md
@@ -14,5 +14,11 @@ A prefix to add to decoded object keys.
+**`is_logging`** *`bool`* *`default=false`*
+
+Field that includes logging (with sampling).
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/action/parse_re2/parse_re2.go b/plugin/action/parse_re2/parse_re2.go
index 7d58e8b6b..1149180bb 100644
--- a/plugin/action/parse_re2/parse_re2.go
+++ b/plugin/action/parse_re2/parse_re2.go
@@ -8,6 +8,7 @@ import (
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
insaneJSON "github.com/vitkovskii/insane-json"
+ "go.uber.org/zap"
)
/*{ introduction
@@ -15,9 +16,9 @@ It parses string from the event field using re2 expression with named subgroups
}*/
type Plugin struct {
- config *Config
-
- re *regexp.Regexp
+ config *Config
+ sampleLogger *zap.SugaredLogger
+ re *regexp.Regexp
plugin.NoMetricsPlugin
}
@@ -39,6 +40,11 @@ type Config struct {
// >
// > A prefix to add to decoded object keys.
Prefix string `json:"prefix" default:""` // *
+
+ // > @3@4@5@6
+ // >
+ // > Field that includes logging (with sampling).
+ IsLogging bool `json:"is_logging" default:"false"` // *
}
func init() {
@@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}
-func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
+func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
+ p.sampleLogger = params.SampleLogger
p.re = regexp.MustCompile(p.config.Re2)
}
@@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
sm := p.re.FindSubmatch(jsonNode.AsBytes())
if len(sm) == 0 {
+ if p.config.IsLogging {
+ p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event))
+ }
return pipeline.ActionPass
}
diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go
index 511dd2ae5..6c46f9b9f 100644
--- a/plugin/action/throttle/throttle.go
+++ b/plugin/action/throttle/throttle.go
@@ -45,13 +45,14 @@ It discards the events if pipeline throughput gets higher than a configured thre
}*/
type Plugin struct {
- ctx context.Context
- cancel context.CancelFunc
- logger *zap.SugaredLogger
- config *Config
- pipeline string
- format string
- redisClient redisClient
+ ctx context.Context
+ cancel context.CancelFunc
+ logger *zap.SugaredLogger
+ sampleLogger *zap.SugaredLogger
+ config *Config
+ pipeline string
+ format string
+ redisClient redisClient
limiterBuf []byte
rules []*rule
@@ -185,6 +186,7 @@ func (p *Plugin) syncWorker(ctx context.Context, jobCh <-chan limiter, wg *sync.
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.logger = params.Logger
+ p.sampleLogger = params.SampleLogger
p.pipeline = params.PipelineName
p.limiterBuf = make([]byte, 0)
ctx, cancel := context.WithCancel(context.Background())
@@ -303,11 +305,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool {
if len(p.config.TimeField_) != 0 {
tsValue := event.Root.Dig(p.config.TimeField_...).AsString()
- t, err := time.Parse(p.format, tsValue)
- if err != nil || ts.IsZero() {
- p.logger.Warnf("can't parse field %q using format %s: %s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
- } else {
- ts = t
+ if tsValue != "" {
+ if t, err := time.Parse(p.format, tsValue); err != nil || ts.IsZero() {
+ p.sampleLogger.Warnf("can't parse field=%q using format=%s time=%s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
+ } else {
+ ts = t
+ }
}
}
diff --git a/plugin/input/k8s/multiline_action_test.go b/plugin/input/k8s/multiline_action_test.go
index 40ce06f40..65e1f9ccb 100644
--- a/plugin/input/k8s/multiline_action_test.go
+++ b/plugin/input/k8s/multiline_action_test.go
@@ -17,11 +17,15 @@ func TestMultilineAction_Do(t *testing.T) {
config := &Config{
SplitEventSize: predictionLookahead * 4,
}
- plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{
- PipelineSettings: &pipeline.Settings{
- MaxEventSize: 20,
+ plugin.Start(config, &pipeline.ActionPluginParams{
+ Logger: zap.S(),
+ SampleLogger: zap.S(),
+ PluginDefaultParams: &pipeline.PluginDefaultParams{
+ PipelineSettings: &pipeline.Settings{
+ MaxEventSize: 20,
+ },
},
- }})
+ })
item := &metaItem{
nodeName: "node_1",
@@ -93,9 +97,13 @@ func TestMultilineAction_Do_shouldSplit(t *testing.T) {
config := &Config{
SplitEventSize: predictionLookahead * 4,
}
- plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{
- PipelineSettings: &pipeline.Settings{},
- }})
+ plugin.Start(config, &pipeline.ActionPluginParams{
+ Logger: zap.S(),
+ SampleLogger: zap.S(),
+ PluginDefaultParams: &pipeline.PluginDefaultParams{
+ PipelineSettings: &pipeline.Settings{},
+ },
+ })
item := &metaItem{
nodeName: "node_1",