Skip to content

Commit

Permalink
feat(common.shim): Convert uneven bytes to int
Browse files Browse the repository at this point in the history
  • Loading branch information
revilon1991 committed Nov 5, 2024
1 parent f43605a commit 2cedc7c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
15 changes: 10 additions & 5 deletions plugins/common/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Shim struct {
Processor telegraf.StreamingProcessor
Output telegraf.Output

BatchSize int
BatchTimeout time.Duration

log telegraf.Logger

// streams
Expand All @@ -56,11 +59,13 @@ type Shim struct {
// New creates a new shim interface
func New() *Shim {
return &Shim{
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: logger.New("", "", ""),
BatchSize: 1,
BatchTimeout: 5 * time.Second,
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: logger.New("", "", ""),
}
}

Expand Down
49 changes: 44 additions & 5 deletions plugins/common/shim/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package shim
import (
"bufio"
"fmt"
"os"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
Expand Down Expand Up @@ -36,19 +38,56 @@ func (s *Shim) RunOutput() error {
}
defer s.Output.Close()

var m telegraf.Metric
mCh := make(chan telegraf.Metric)
done := make(chan struct{})

go func() {
var batch []telegraf.Metric
timer := time.NewTimer(s.BatchTimeout)
defer timer.Stop()

for {
select {
case m := <-mCh:
batch = append(batch, m)
if len(batch) >= s.BatchSize {
if err = s.Output.Write(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err)
}
batch = batch[:0]
timer.Reset(s.BatchTimeout)
}
case <-timer.C:
if len(batch) > 0 {
if err = s.Output.Write(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err)
}
batch = batch[:0]
}
timer.Reset(s.BatchTimeout)
case <-done:
if len(batch) > 0 {
if err = s.Output.Write(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write remaining metrics: %s\n", err)
}
}
return
}
}
}()

scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err = parser.ParseLine(scanner.Text())
m, err := parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err)
continue
}
if err = s.Output.Write([]telegraf.Metric{m}); err != nil {
fmt.Fprintf(s.stderr, "Failed to write metric: %s\n", err)
}

mCh <- m
}

close(done)

return nil
}

0 comments on commit 2cedc7c

Please sign in to comment.