diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go index 3d329cf04babb..939e4f06e86ec 100644 --- a/plugins/common/shim/goshim.go +++ b/plugins/common/shim/goshim.go @@ -39,6 +39,9 @@ type Shim struct { Processor telegraf.StreamingProcessor Output telegraf.Output + BatchSize int + BatchTimeout time.Duration + log telegraf.Logger // streams @@ -56,11 +59,13 @@ type Shim struct { // New creates a new shim interface func New() *Shim { return &Shim{ - metricCh: make(chan telegraf.Metric, 1), - stdin: os.Stdin, - stdout: os.Stdout, - stderr: os.Stderr, - log: logger.New("", "", ""), + BatchSize: 1, + BatchTimeout: 5 * time.Second, + metricCh: make(chan telegraf.Metric, 1), + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, + log: logger.New("", "", ""), } } diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index 88108673a5a93..233d7645c2889 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -3,6 +3,8 @@ package shim import ( "bufio" "fmt" + "os" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" @@ -36,19 +38,56 @@ func (s *Shim) RunOutput() error { } defer s.Output.Close() - var m telegraf.Metric + mCh := make(chan telegraf.Metric) + done := make(chan struct{}) + + go func() { + var batch []telegraf.Metric + timer := time.NewTimer(s.BatchTimeout) + defer timer.Stop() + + for { + select { + case m := <-mCh: + batch = append(batch, m) + if len(batch) >= s.BatchSize { + if err = s.Output.Write(batch); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err) + } + batch = batch[:0] + timer.Reset(s.BatchTimeout) + } + case <-timer.C: + if len(batch) > 0 { + if err = s.Output.Write(batch); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err) + } + batch = batch[:0] + } + timer.Reset(s.BatchTimeout) + case <-done: + if len(batch) > 0 { + if err = s.Output.Write(batch); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write remaining metrics: %s\n", err) + } + } + return + } + } + }() scanner := bufio.NewScanner(s.stdin) for scanner.Scan() { - m, err = parser.ParseLine(scanner.Text()) + m, err := parser.ParseLine(scanner.Text()) if err != nil { fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err) continue } - if err = s.Output.Write([]telegraf.Metric{m}); err != nil { - fmt.Fprintf(s.stderr, "Failed to write metric: %s\n", err) - } + + mCh <- m } + close(done) + return nil }