Skip to content

Commit

Permalink
add sample logger for parse_re2, discard, throttle plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
DSmolonogov committed Jan 16, 2023
1 parent e592906 commit 7f847de
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 57 deletions.
60 changes: 42 additions & 18 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package logger
import (
"os"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var Instance *zap.SugaredLogger
var SampleInstance *zap.SugaredLogger
var Level zap.AtomicLevel

const defaultLevel = zap.InfoLevel
const (
defaultLevel = zap.InfoLevel
defaultTick = time.Second
defaultFirst = 10
defaultThereAfter = 100
)

func init() {
var level zapcore.Level
Expand All @@ -32,27 +39,44 @@ func init() {

Level = zap.NewAtomicLevelAt(level)

core := zapcore.NewCore(
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "Instance",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
zapcore.AddSync(os.Stdout),
Level,
)

sampleCore := zapcore.NewSamplerWithOptions(
core,
defaultTick,
defaultFirst,
defaultThereAfter,
)

// logger initialization
Instance = zap.New(
zapcore.NewCore(
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "Instance",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
zapcore.AddSync(os.Stdout),
Level,
),
core,
).Sugar().Named("fd")

Instance.Infof("Logger initialized with level: %s", level)

// sample logger initialization
SampleInstance = zap.New(
sampleCore,
).Sugar().Named("fd")

SampleInstance.Infof("SampleLogger initialized with level: %s", level)
}

func Debug(args ...any) {
Expand Down
6 changes: 4 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type Pipeline struct {

// some debugging stuff
logger *zap.SugaredLogger
sampleLogger *zap.SugaredLogger
eventLogEnabled bool
eventLog []string
eventLogMu *sync.Mutex
Expand Down Expand Up @@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
pipeline := &Pipeline{
Name: name,
logger: logger.Instance.Named(name),
sampleLogger: logger.SampleInstance.Named(name),
settings: settings,
useSpread: false,
disableStreams: false,
Expand Down Expand Up @@ -266,7 +268,7 @@ func (p *Pipeline) Start() {
p.logger.Infof("stating processors, count=%d", len(p.Procs))
for _, processor := range p.Procs {
processor.registerMetrics(p.metricsCtl)
processor.start(p.actionParams, p.logger)
processor.start(p.actionParams, p.logger, p.sampleLogger)
}

p.logger.Infof("starting input plugin %q", p.inputInfo.Type)
Expand Down Expand Up @@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() {
proc := p.newProc()
p.Procs = append(p.Procs, proc)
proc.registerMetrics(p.metricsCtl)
proc.start(p.actionParams, p.logger)
proc.start(p.actionParams, p.logger, p.sampleLogger)
}

p.procCount.Swap(to)
Expand Down
5 changes: 3 additions & 2 deletions pipeline/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type PluginDefaultParams struct {

type ActionPluginParams struct {
*PluginDefaultParams
Controller ActionPluginController
Logger *zap.SugaredLogger
Controller ActionPluginController
Logger *zap.SugaredLogger
SampleLogger *zap.SugaredLogger
}

type OutputPluginParams struct {
Expand Down
5 changes: 3 additions & 2 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ func NewProcessor(
return processor
}

func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) {
func (p *processor) start(params *PluginDefaultParams, defaultLogger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) {
for i, action := range p.actions {
actionInfo := p.actionInfos[i]
action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{
PluginDefaultParams: params,
Controller: p,
Logger: logger.Named("action").Named(actionInfo.Type),
Logger: defaultLogger.Named("action").Named(actionInfo.Type),
SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type),
})
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/action/discard/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Discard plugin
@introduction

> No config params
### Config params
@config-params|description
8 changes: 7 additions & 1 deletion plugin/action/discard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pipelines:
...
```

> No config params
### Config params
**`is_logging`** *`bool`* *`default=false`*

Field that includes logging (with sampling).

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
21 changes: 18 additions & 3 deletions plugin/action/discard/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
"go.uber.org/zap"
)

/*{ introduction
Expand All @@ -23,10 +24,19 @@ pipelines:
}*/

type Plugin struct {
config *Config
sampleLogger *zap.SugaredLogger
plugin.NoMetricsPlugin
}

type Config struct{}
// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > Field that includes logging (with sampling).
IsLogging bool `json:"is_logging" default:"false"` // *
}

func init() {
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
Expand All @@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.sampleLogger = params.SampleLogger
}

func (p *Plugin) Stop() {
}

func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult {
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
if p.config.IsLogging {
p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event))
}
return pipeline.ActionDiscard
}
8 changes: 4 additions & 4 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestDiscardAnd(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, false))

wg := &sync.WaitGroup{}
wg.Add(10)
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestDiscardOr(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))

wg := &sync.WaitGroup{}
wg.Add(8)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestDiscardRegex(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))

wg := &sync.WaitGroup{}
wg.Add(11)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestDiscardMatchInvert(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, true))

wg := &sync.WaitGroup{}
wg.Add(9)
Expand Down
3 changes: 2 additions & 1 deletion plugin/action/mask/mask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) {
PipelineName: "test_pipeline",
PipelineSettings: &pipeline.Settings{},
},
Logger: zap.L().Sugar(),
Logger: zap.L().Sugar(),
SampleLogger: zap.L().Sugar(),
})
plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re)

Expand Down
6 changes: 6 additions & 0 deletions plugin/action/parse_re2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,11 @@ A prefix to add to decoded object keys.

<br>

**`is_logging`** *`bool`* *`default=false`*

Field that includes logging (with sampling).

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
18 changes: 14 additions & 4 deletions plugin/action/parse_re2/parse_re2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

/*{ introduction
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.
}*/

type Plugin struct {
config *Config

re *regexp.Regexp
config *Config
sampleLogger *zap.SugaredLogger
re *regexp.Regexp
plugin.NoMetricsPlugin
}

Expand All @@ -39,6 +40,11 @@ type Config struct {
// >
// > A prefix to add to decoded object keys.
Prefix string `json:"prefix" default:""` // *

// > @3@4@5@6
// >
// > Field that includes logging (with sampling).
IsLogging bool `json:"is_logging" default:"false"` // *
}

func init() {
Expand All @@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.sampleLogger = params.SampleLogger

p.re = regexp.MustCompile(p.config.Re2)
}
Expand All @@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
sm := p.re.FindSubmatch(jsonNode.AsBytes())

if len(sm) == 0 {
if p.config.IsLogging {
p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event))
}
return pipeline.ActionPass
}

Expand Down
27 changes: 15 additions & 12 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ It discards the events if pipeline throughput gets higher than a configured thre
}*/

type Plugin struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.SugaredLogger
config *Config
pipeline string
format string
redisClient redisClient
ctx context.Context
cancel context.CancelFunc
logger *zap.SugaredLogger
sampleLogger *zap.SugaredLogger
config *Config
pipeline string
format string
redisClient redisClient

limiterBuf []byte
rules []*rule
Expand Down Expand Up @@ -185,6 +186,7 @@ func (p *Plugin) syncWorker(ctx context.Context, jobCh <-chan limiter, wg *sync.
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.logger = params.Logger
p.sampleLogger = params.SampleLogger
p.pipeline = params.PipelineName
p.limiterBuf = make([]byte, 0)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -303,11 +305,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool {

if len(p.config.TimeField_) != 0 {
tsValue := event.Root.Dig(p.config.TimeField_...).AsString()
t, err := time.Parse(p.format, tsValue)
if err != nil || ts.IsZero() {
p.logger.Warnf("can't parse field %q using format %s: %s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
} else {
ts = t
if tsValue != "" {
if t, err := time.Parse(p.format, tsValue); err != nil || ts.IsZero() {
p.sampleLogger.Warnf("can't parse field=%q using format=%s time=%s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
} else {
ts = t
}
}
}

Expand Down
Loading

0 comments on commit 7f847de

Please sign in to comment.