From d09a42febd845792a4a6e210dc01717e35a2cba1 Mon Sep 17 00:00:00 2001 From: Vadim Alekseev Date: Sun, 2 Jul 2023 18:17:07 +0300 Subject: [PATCH] Basic server implementation for the file.d playground --- cmd/file.d/file.d_test.go | 9 +- cmd/playground/playground.go | 42 +++++ fd/file.d.go | 46 +++-- fd/plugin_registry.go | 17 +- pipeline/event.go | 4 + pipeline/pipeline.go | 5 +- pipeline/pipeline_test.go | 5 +- pipeline/pipeline_whitebox_test.go | 3 +- pipeline/processor.go | 7 + playground/fakeclock.go | 26 +++ playground/server.go | 269 +++++++++++++++++++++++++++++ plugin/action/debug/debug.go | 27 ++- plugin/output/file/helpers_test.go | 3 +- plugin/output/s3/s3_test.go | 3 +- test/test.go | 2 +- 15 files changed, 425 insertions(+), 43 deletions(-) create mode 100644 cmd/playground/playground.go create mode 100644 playground/fakeclock.go create mode 100644 playground/server.go diff --git a/cmd/file.d/file.d_test.go b/cmd/file.d/file.d_test.go index f1b462ee9..82ae3613f 100644 --- a/cmd/file.d/file.d_test.go +++ b/cmd/file.d/file.d_test.go @@ -184,7 +184,8 @@ func TestThatPluginsAreImported(t *testing.T) { "throttle", } for _, pluginName := range action { - pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName) + pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName) + require.NoError(t, err) require.NotNil(t, pluginInfo) } @@ -196,7 +197,8 @@ func TestThatPluginsAreImported(t *testing.T) { "kafka", } for _, pluginName := range input { - pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName) + pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName) + require.NoError(t, err) require.NotNil(t, pluginInfo) } @@ -211,7 +213,8 @@ func TestThatPluginsAreImported(t *testing.T) { "stdout", } for _, pluginName := range output { - pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName) + pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName) + require.NoError(t, err) require.NotNil(t, pluginInfo) } } diff --git a/cmd/playground/playground.go b/cmd/playground/playground.go new file mode 100644 index 000000000..956b8c9a7 --- /dev/null +++ b/cmd/playground/playground.go @@ -0,0 +1,42 @@ +package main + +import ( + "net/http" + + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/logger" + "github.com/ozontech/file.d/playground" + + _ "github.com/ozontech/file.d/plugin/action/add_file_name" + _ "github.com/ozontech/file.d/plugin/action/add_host" + _ "github.com/ozontech/file.d/plugin/action/convert_date" + _ "github.com/ozontech/file.d/plugin/action/convert_log_level" + _ "github.com/ozontech/file.d/plugin/action/debug" + _ "github.com/ozontech/file.d/plugin/action/discard" + _ "github.com/ozontech/file.d/plugin/action/flatten" + _ "github.com/ozontech/file.d/plugin/action/join" + _ "github.com/ozontech/file.d/plugin/action/join_template" + _ "github.com/ozontech/file.d/plugin/action/json_decode" + _ "github.com/ozontech/file.d/plugin/action/json_encode" + _ "github.com/ozontech/file.d/plugin/action/keep_fields" + _ "github.com/ozontech/file.d/plugin/action/mask" + _ "github.com/ozontech/file.d/plugin/action/modify" + _ "github.com/ozontech/file.d/plugin/action/parse_es" + _ "github.com/ozontech/file.d/plugin/action/parse_re2" + _ "github.com/ozontech/file.d/plugin/action/remove_fields" + _ "github.com/ozontech/file.d/plugin/action/rename" + _ "github.com/ozontech/file.d/plugin/action/set_time" + + _ "github.com/ozontech/file.d/plugin/input/fake" + _ "github.com/ozontech/file.d/plugin/output/devnull" +) + +func main() { + lg := logger.Instance.Desugar().Named("playground") + handler := playground.NewDoActionsHandler(fd.DefaultPluginRegistry, lg) + + err := http.ListenAndServe(":9090", handler) + if err != nil && err != http.ErrServerClosed { + panic(err) + } +} diff --git a/fd/file.d.go b/fd/file.d.go index 8a4291305..3a3974d34 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -97,13 +97,16 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) { logger.Infof("creating pipeline %q: capacity=%d, stream field=%s, decoder=%s", name, settings.Capacity, settings.StreamField, settings.Decoder) - p := pipeline.New(name, settings, f.registry) + p := pipeline.New(name, settings, f.registry, logger.Instance.Named(name)) err := f.setupInput(p, config, values) if err != nil { logger.Fatalf("can't create pipeline %q: %s", name, err.Error()) } - f.setupActions(p, config, values) + actions := config.Raw.Get("actions") + if err := SetupActions(p, f.plugins, actions, values); err != nil { + logger.Fatalf("can't create pipeline %q: %s", name, err.Error()) + } err = f.setupOutput(p, config, values) if err != nil { @@ -126,7 +129,10 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon }) for _, actionType := range inputInfo.AdditionalActions { - actionInfo := f.plugins.GetActionByType(actionType) + actionInfo, err := f.plugins.GetActionByType(actionType) + if err != nil { + return err + } infoCopy := *actionInfo infoCopy.Config = inputInfo.Config @@ -141,46 +147,50 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon return nil } -func (f *FileD) setupActions(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) { - actions := pipelineConfig.Raw.Get("actions") +func SetupActions(p *pipeline.Pipeline, plugins *PluginRegistry, actions *simplejson.Json, values map[string]int) error { for index := range actions.MustArray() { actionJSON := actions.GetIndex(index) if actionJSON.MustMap() == nil { - logger.Fatalf("empty action #%d for pipeline %q", index, p.Name) + return fmt.Errorf("empty action #%d for pipeline %q", index, p.Name) } t := actionJSON.Get("type").MustString() if t == "" { - logger.Fatalf("action #%d doesn't provide type %q", index, p.Name) + return fmt.Errorf("action #%d doesn't provide type %q", index, p.Name) + } + if err := setupAction(p, plugins, index, t, actionJSON, values); err != nil { + return err } - f.setupAction(p, index, t, actionJSON, values) } + return nil } -func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSON *simplejson.Json, values map[string]int) { - logger.Infof("creating action with type %q for pipeline %q", t, p.Name) - info := f.plugins.GetActionByType(t) +func setupAction(p *pipeline.Pipeline, plugins *PluginRegistry, index int, t string, actionJSON *simplejson.Json, values map[string]int) error { + info, err := plugins.GetActionByType(t) + if err != nil { + return err + } matchMode := extractMatchMode(actionJSON) if matchMode == pipeline.MatchModeUnknown { - logger.Fatalf("unknown match_mode value for action %d/%s in pipeline %q", index, t, p.Name) + return fmt.Errorf("unknown match_mode value for action %d/%s", index, t) } matchInvert := extractMatchInvert(actionJSON) conditions, err := extractConditions(actionJSON.Get("match_fields")) if err != nil { - logger.Fatalf("can't extract conditions for action %d/%s in pipeline %q: %s", index, t, p.Name, err.Error()) + return fmt.Errorf("can't extract conditions for action %d/%s: %s", index, t, err.Error()) } metricName, metricLabels, skipStatus := extractMetrics(actionJSON) configJSON := makeActionJSON(actionJSON) _, config := info.Factory() if err := DecodeConfig(config, configJSON); err != nil { - logger.Fatalf("can't unmarshal config for %s action in pipeline %q: %s", info.Type, p.Name, err.Error()) + return fmt.Errorf("can't unmarshal config for %s action: %s", info.Type, err.Error()) } err = cfg.Parse(config, values) if err != nil { - logger.Fatalf("wrong config for %q action in pipeline %q: %s", info.Type, p.Name, err.Error()) + return fmt.Errorf("wrong config for %q action: %s", info.Type, err.Error()) } infoCopy := *info @@ -196,6 +206,7 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO MetricSkipStatus: skipStatus, MatchInvert: matchInvert, }) + return nil } func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) error { @@ -232,7 +243,10 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip return nil, fmt.Errorf("%s doesn't have type", pluginKind) } logger.Infof("creating %s with type %q", pluginKind, t) - info := f.plugins.Get(pluginKind, t) + info, err := f.plugins.Get(pluginKind, t) + if err != nil { + return nil, err + } configJson, err := configJSON.Encode() if err != nil { logger.Panicf("can't create config json for %s", t) diff --git a/fd/plugin_registry.go b/fd/plugin_registry.go index acf3ec29a..574d66468 100644 --- a/fd/plugin_registry.go +++ b/fd/plugin_registry.go @@ -1,6 +1,8 @@ package fd import ( + "fmt" + "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" ) @@ -13,28 +15,25 @@ type PluginRegistry struct { plugins map[string]*pipeline.PluginStaticInfo } -func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) *pipeline.PluginStaticInfo { +func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) (*pipeline.PluginStaticInfo, error) { id := r.MakeID(kind, t) info := r.plugins[id] if info == nil { - logger.Fatalf("can't find plugin kind=%s type=%s", kind, t) - return nil + return nil, fmt.Errorf("can't find plugin kind=%s type=%s", kind, t) } - return info + return info, nil } -func (r *PluginRegistry) GetActionByType(t string) *pipeline.PluginStaticInfo { +func (r *PluginRegistry) GetActionByType(t string) (*pipeline.PluginStaticInfo, error) { id := r.MakeID(pipeline.PluginKindAction, t) info := r.plugins[id] if info == nil { - logger.Fatalf("can't find action plugin with type %q", t) - return nil + return nil, fmt.Errorf("can't find action plugin with type %q", t) } - - return info + return info, nil } func (r *PluginRegistry) RegisterInput(info *pipeline.PluginStaticInfo) { diff --git a/pipeline/event.go b/pipeline/event.go index 13c0b381b..f085bea73 100644 --- a/pipeline/event.go +++ b/pipeline/event.go @@ -191,6 +191,10 @@ func (e *Event) String() string { return fmt.Sprintf("kind=%s, action=%d, source=%d/%s, stream=%s, stage=%s, json=%s", e.kindStr(), e.action.Load(), e.SourceID, e.SourceName, e.streamName, e.stageStr(), e.Root.EncodeToString()) } +func (e *Event) Action() int { + return int(e.action.Load()) +} + // channels are slower than this implementation by ~20% type eventPool struct { capacity int diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index f98f6f274..ea6dcf66a 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -55,6 +55,7 @@ type InputPluginController interface { type ActionPluginController interface { Commit(event *Event) // commit offset of held event and skip further processing Propagate(event *Event) // throw held event back to pipeline + ActionTypeByIndex(idx int) (string, bool) } type OutputPluginController interface { @@ -139,11 +140,9 @@ type Settings struct { } // New creates new pipeline. Consider using `SetupHTTPHandlers` next. -func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline { +func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap.SugaredLogger) *Pipeline { metricCtl := metric.New("pipeline_"+name, registry) - lg := logger.Instance.Named(name) - pipeline := &Pipeline{ Name: name, logger: lg, diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 6dce69506..f2d8338eb 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -8,6 +8,7 @@ import ( "github.com/ozontech/file.d/plugin/input/fake" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func getFakeInputInfo() *pipeline.InputPluginInfo { @@ -36,7 +37,7 @@ func TestInUnparsableMessages(t *testing.T) { sourceID := pipeline.SourceID(3<<16 + int(10)) t.Run(name, func(t *testing.T) { - pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry()) + pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry(), zap.NewNop().Sugar()) pipe.SetInput(getFakeInputInfo()) @@ -112,7 +113,7 @@ func TestInInvalidMessages(t *testing.T) { for _, tCase := range cases { t.Run(tCase.name, func(t *testing.T) { - pipe := pipeline.New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry()) + pipe := pipeline.New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry(), zap.NewNop().Sugar()) pipe.SetInput(getFakeInputInfo()) diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index 85a3fc0ec..5bfd8653a 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "go.uber.org/atomic" + "go.uber.org/zap" ) func TestPipeline_streamEvent(t *testing.T) { @@ -13,7 +14,7 @@ func TestPipeline_streamEvent(t *testing.T) { Capacity: 5, Decoder: "json", } - p := New("test", settings, prometheus.NewRegistry()) + p := New("test", settings, prometheus.NewRegistry(), zap.NewNop().Sugar()) streamID := StreamID(123123) procs := int32(7) diff --git a/pipeline/processor.go b/pipeline/processor.go index a7c71a1a8..2bc993f48 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -349,3 +349,10 @@ func (p *processor) Propagate(event *Event) { func (p *processor) RecoverFromPanic() { p.recoverFromPanic() } + +func (p *processor) ActionTypeByIndex(idx int) (string, bool) { + if idx <= 0 || idx >= len(p.actionInfos) { + return "", false + } + return p.actionInfos[idx].Type, true +} diff --git a/playground/fakeclock.go b/playground/fakeclock.go new file mode 100644 index 000000000..5e974ccb5 --- /dev/null +++ b/playground/fakeclock.go @@ -0,0 +1,26 @@ +package playground + +import ( + "time" + + "go.uber.org/zap/zapcore" +) + +type ZeroClock struct { + start time.Time +} + +func NewZeroClock(now time.Time) *ZeroClock { + return &ZeroClock{start: now} +} + +var _ zapcore.Clock = ZeroClock{} + +func (z ZeroClock) Now() time.Time { + diff := time.Since(z.start) + return time.Time{}.Add(diff) +} + +func (z ZeroClock) NewTicker(_ time.Duration) *time.Ticker { + return new(time.Ticker) +} diff --git a/playground/server.go b/playground/server.go new file mode 100644 index 000000000..def5bfc9e --- /dev/null +++ b/playground/server.go @@ -0,0 +1,269 @@ +package playground + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "runtime" + "strings" + "time" + + "github.com/bitly/go-simplejson" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/plugin/output/devnull" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "go.uber.org/atomic" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "sigs.k8s.io/yaml" +) + +const ( + pipelineCapacity = 1 +) + +type DoActionsRequest struct { + Actions []json.RawMessage `json:"actions"` + Events []json.RawMessage `json:"events"` + Debug bool `json:"debug"` +} + +type ProcessResult struct { + Event json.RawMessage `json:"event"` +} + +type DoActionsResponse struct { + Result []ProcessResult `json:"result"` + Stdout string `json:"stdout"` + Metrics string `json:"metrics"` +} + +type DoActionsHandler struct { + plugins *fd.PluginRegistry + logger *zap.Logger + requests atomic.Int64 +} + +var _ http.Handler = (*DoActionsHandler)(nil) + +func NewDoActionsHandler(plugins *fd.PluginRegistry, logger *zap.Logger) *DoActionsHandler { + return &DoActionsHandler{plugins: plugins, logger: logger} +} + +func (h *DoActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + + req, err := h.unmarshalRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if len(req.Events) > 32 || len(req.Events) == 0 || len(req.Actions) > 64 { + http.Error(w, "validate error: events count must be in range [0, 32] and actions count [0, 64]", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), time.Second*2) + defer cancel() + + resp, code, err := h.doActions(ctx, req) + if err != nil { + http.Error(w, fmt.Sprintf("do actions: %s", err.Error()), code) + return + } + _ = json.NewEncoder(w).Encode(resp) +} + +func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest) (resp DoActionsResponse, code int, err error) { + if req.Debug { + // wrap all plugins with the debug plugin + newActions := make([]json.RawMessage, 0, len(req.Actions)*2) + debugAction := json.RawMessage(`{"type": "debug"}`) + + newActions = append(newActions, debugAction) + for _, action := range req.Actions { + newActions = append(newActions, action, debugAction) + } + + req.Actions = newActions + } + + // stdout buffer + buf := new(bytes.Buffer) + buf.Grow(1 << 10) + stdout := preparePipelineLogger(buf) + + metricsRegistry := prometheus.NewRegistry() + + p := pipeline.New("test", &pipeline.Settings{ + Decoder: "json", + Capacity: pipelineCapacity, + MaintenanceInterval: time.Millisecond * 100, + EventTimeout: time.Millisecond * 100, + AntispamThreshold: 0, + IsStrict: false, + }, metricsRegistry, stdout.Sugar()) + + events := make(chan json.RawMessage, len(req.Events)) + outputCb := func(event *pipeline.Event) { + events <- event.Root.EncodeToByte() + } + + if err := h.setupPipeline(p, req, outputCb); err != nil { + return resp, http.StatusBadRequest, err + } + + p.Start() + // push events + for i, event := range req.Events { + p.In(pipeline.SourceID(h.requests.Inc()), "fake", int64(i), event, true) + } + + // collect result + var result []ProcessResult +loop: + for { + select { + case <-ctx.Done(): + h.logger.Warn("request timed out") + break loop + default: + result = append(result, ProcessResult{ + Event: <-events, + }) + if len(result) >= len(req.Events) { + break loop + } + runtime.Gosched() + } + } + p.Stop() + + // collect metrics + metricsInfo, err := metricsRegistry.Gather() + if err != nil { + h.logger.Error("can't gather metrics", zap.Error(err)) + } + + _ = stdout.Sync() + + return DoActionsResponse{ + Result: result, + Stdout: buf.String(), + Metrics: formatMetricFamily(metricsInfo), + }, http.StatusOK, nil +} + +func (h *DoActionsHandler) setupPipeline(p *pipeline.Pipeline, req DoActionsRequest, cb func(event *pipeline.Event)) error { + if req.Actions == nil { + req.Actions = []json.RawMessage{[]byte(`[]`)} + } + + actionsArray, _ := json.Marshal(req.Actions) + actionsRaw, err := simplejson.NewJson(actionsArray) + if err != nil { + return fmt.Errorf("read actions: %w", err) + } + values := map[string]int{ + "capacity": pipelineCapacity, + "gomaxprocs": runtime.GOMAXPROCS(0), + } + if err := fd.SetupActions(p, h.plugins, actionsRaw, values); err != nil { + return err + } + + // setup input + inputStaticInfo, err := h.plugins.Get(pipeline.PluginKindInput, "fake") + if err != nil { + return err + } + + inputPlugin, config := inputStaticInfo.Factory() + inputStaticInfo.Config = config + + p.SetInput(&pipeline.InputPluginInfo{ + PluginStaticInfo: inputStaticInfo, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{ + Plugin: inputPlugin, + }, + }) + + // setup output + outputStaticInfo, err := h.plugins.Get(pipeline.PluginKindOutput, "devnull") + if err != nil { + return err + } + + outputPlugin, config := outputStaticInfo.Factory() + outputStaticInfo.Config = config + + outputPlugin.(*devnull.Plugin).SetOutFn(cb) + + p.SetOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: outputStaticInfo, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{ + Plugin: outputPlugin, + }, + }) + return nil +} + +func (h *DoActionsHandler) unmarshalRequest(r *http.Request) (DoActionsRequest, error) { + defer func(body io.ReadCloser) { + _ = body.Close() + }(r.Body) + + bodyRaw, err := io.ReadAll(r.Body) + if err != nil { + return DoActionsRequest{}, fmt.Errorf("reading body: %s", err) + } + + if strings.HasSuffix(r.Header.Get("Content-Type"), "yaml") { + bodyRaw, err = yaml.YAMLToJSON(bodyRaw) + if err != nil { + return DoActionsRequest{}, fmt.Errorf("converting YAML to JSON: %s", err) + } + } + + var req DoActionsRequest + if err := json.Unmarshal(bodyRaw, &req); err != nil { + return DoActionsRequest{}, fmt.Errorf("unmarshalling json: %s", err) + } + return req, nil +} + +func preparePipelineLogger(buf *bytes.Buffer) *zap.Logger { + encoderConfig := zap.NewProductionEncoderConfig() + encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("05.000000") + + stdout := zap.New( + zapcore.NewCore( + zapcore.NewJSONEncoder(encoderConfig), + zapcore.AddSync(buf), + zapcore.DebugLevel, + ), + zap.WithFatalHook(zapcore.WriteThenGoexit), + zap.WithCaller(false), + zap.WithClock(NewZeroClock(time.Now())), + ) + return stdout +} + +func formatMetricFamily(families []*dto.MetricFamily) string { + b := new(bytes.Buffer) + for _, f := range families { + _ = expfmt.NewEncoder(b, expfmt.FmtOpenMetrics).Encode(f) + b.WriteString("\n") + } + return b.String() +} diff --git a/plugin/action/debug/debug.go b/plugin/action/debug/debug.go index 3834780f4..ef6757b04 100644 --- a/plugin/action/debug/debug.go +++ b/plugin/action/debug/debug.go @@ -1,16 +1,22 @@ package debug import ( + "encoding/json" + "github.com/ozontech/file.d/fd" - "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/pipeline" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) /*{ introduction It logs event to stdout. Useful for debugging. }*/ -type Plugin struct{} +type Plugin struct { + logger *zap.Logger + controller pipeline.ActionPluginController +} type Config map[string]any @@ -25,15 +31,24 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } -func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) { +func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.logger = params.Logger.Desugar() + p.controller = params.Controller } func (p *Plugin) Stop() { } func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { - buf, _ := event.Encode(nil) - logger.Infof("%s", buf) - + prevAction, ok := p.controller.ActionTypeByIndex(event.Action() - 1) + if !ok { + prevAction = "initial" + } + if ce := p.logger.Check(zapcore.InfoLevel, prevAction); ce != nil { + ce.Write( + zap.Int64("offset", event.Offset), + zap.Any("event", json.RawMessage(event.Root.EncodeToString())), + ) + } return pipeline.ActionPass } diff --git a/plugin/output/file/helpers_test.go b/plugin/output/file/helpers_test.go index efe667da4..1edbd4448 100644 --- a/plugin/output/file/helpers_test.go +++ b/plugin/output/file/helpers_test.go @@ -9,6 +9,7 @@ import ( "github.com/ozontech/file.d/plugin/input/fake" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func createFile(t *testing.T, fileName string, data *[]byte) *os.File { @@ -56,7 +57,7 @@ func newPipeline(t *testing.T, configOutput *Config) *pipeline.Pipeline { Decoder: "json", } - p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry()) + p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry(), zap.NewNop().Sugar()) p.DisableParallelism() p.EnableEventLog() diff --git a/plugin/output/s3/s3_test.go b/plugin/output/s3/s3_test.go index 6f91f9294..1bc580a73 100644 --- a/plugin/output/s3/s3_test.go +++ b/plugin/output/s3/s3_test.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "go.uber.org/atomic" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -417,7 +418,7 @@ func newPipeline(t *testing.T, configOutput *Config, objStoreF objStoreFactory) Decoder: "json", } - p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry()) + p := pipeline.New("test_pipeline", settings, prometheus.NewRegistry(), zap.NewNop().Sugar()) p.DisableParallelism() p.EnableEventLog() diff --git a/test/test.go b/test/test.go index 7bbe10d1b..f12418eb1 100644 --- a/test/test.go +++ b/test/test.go @@ -127,7 +127,7 @@ func NewPipeline(actions []*pipeline.ActionPluginStaticInfo, pipelineOpts ...str pName += strconv.Itoa(rand.Int()) } - p := pipeline.New(pName, settings, prometheus.NewRegistry()) + p := pipeline.New(pName, settings, prometheus.NewRegistry(), zap.NewNop().Sugar()) if !parallel { p.DisableParallelism() }