Skip to content

Commit

Permalink
feat(accumulator): add high priority accumulator to get response from…
Browse files Browse the repository at this point in the history
… output plugins immediately
  • Loading branch information
zipper-meng committed Oct 29, 2024
1 parent 160548d commit 30bbd9a
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 5 deletions.
9 changes: 9 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Accumulator interface {
// Upgrade to a TrackingAccumulator with space for maxTracked
// metrics/batches.
WithTracking(maxTracked int) TrackingAccumulator

// Upgrade to a HighPriorityAccumulator
ToHighPriority() HighPriorityAccumulator
}

// TrackingID uniquely identifies a tracked metric group
Expand Down Expand Up @@ -91,3 +94,9 @@ type TrackingAccumulator interface {
// Delivered returns a channel that will contain the tracking results.
Delivered() <-chan DeliveryInfo
}

type HighPriorityAccumulator interface {
Accumulator

AddMetricHighPriority(Metric) error
}
22 changes: 22 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func (ac *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator
}
}

func (ac *accumulator) ToHighPriority() telegraf.HighPriorityAccumulator {
return &highPriorityAccumulator{
ac,
}
}

type trackingAccumulator struct {
telegraf.Accumulator
delivered chan telegraf.DeliveryInfo
Expand Down Expand Up @@ -158,3 +164,19 @@ func (a *trackingAccumulator) onDelivery(info telegraf.DeliveryInfo) {
panic("channel is full")
}
}

type highPriorityAccumulator struct {
*accumulator
}

func (ac *highPriorityAccumulator) AddMetricHighPriority(m telegraf.Metric) error {
m.SetTime(m.Time().Round(ac.precision))
if m := ac.maker.MakeMetric(m); m != nil {
errCh := make(chan error, 1)
ac.metrics <- m.ToHighPriority(errCh)

err := <-errCh
return err
}
return nil
}
4 changes: 4 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (a *Agent) startInputs(
acc := NewAccumulator(input, dst)
acc.SetPrecision(getPrecision(precision, interval))

if input.Config.HighPriorityIO {
acc = acc.ToHighPriority()
}

if err := input.Start(acc); err != nil {
// If the model tells us to remove the plugin we do so without error
var fatalErr *internal.FatalError
Expand Down
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,13 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err
}

if pluginConfig.HighPriorityIO {
if _, ok = input.(telegraf.HighPriorityInput); !ok {
return fmt.Errorf("input plugin %s is not high priority input plugin", pluginConfig.Name)
}
log.Printf("I! [agent] Input plugin %s is high-priority-IO\n", pluginConfig.Name)
}

if err := c.toml.UnmarshalTable(table, input); err != nil {
return err
}
Expand Down Expand Up @@ -1531,6 +1538,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
cp.NameOverride = c.getFieldString(tbl, "name_override")
cp.Alias = c.getFieldString(tbl, "alias")
cp.LogLevel = c.getFieldString(tbl, "log_level")
cp.HighPriorityIO = c.getFieldBool(tbl, "high_priority_io")

cp.Tags = make(map[string]string)
if node, ok := tbl.Fields["tags"]; ok {
Expand Down Expand Up @@ -1621,6 +1629,9 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Parser and serializer options to ignore
case "data_type", "influx_parser_type":

// Customized options to ignore
case "high_priority_io":

default:
c.unusedFieldsMutex.Lock()
c.UnusedFields[key] = true
Expand Down
8 changes: 8 additions & 0 deletions input.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,11 @@ type ServiceInput interface {
// to the accumulator before returning.
Stop()
}

type HighPriorityInput interface {
Input

// MarkHighPriority does nothing but tell us this input plugin may be a high
// priority input plugin.
MarkHighPriority()
}
9 changes: 9 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ type Metric interface {
// Drop marks the metric as processed successfully without being written
// to any output.
Drop()

// ToHighPriority upgrade the metric to a HighPriorityMetric.
ToHighPriority(chan<- error) HighPriorityMetric
}

type HighPriorityMetric interface {
Metric

ErrorCh() chan<- error
}

// TemplateMetric is an interface to use in templates (e.g text/template)
Expand Down
16 changes: 16 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,22 @@ func (m *metric) Reject() {
func (m *metric) Drop() {
}

func (m *metric) ToHighPriority(errorCh chan<- error) telegraf.HighPriorityMetric {
return &highPriorityMetric{
metric: m,
errorCh: errorCh,
}
}

type highPriorityMetric struct {
*metric
errorCh chan<- error
}

func (m *highPriorityMetric) ErrorCh() chan<- error {
return m.errorCh
}

// Convert field to a supported type or nil if inconvertible
func convertField(v interface{}) interface{} {
switch v := v.(type) {
Expand Down
2 changes: 2 additions & 0 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type InputConfig struct {
StartupErrorBehavior string
LogLevel string

HighPriorityIO bool

NameOverride string
MeasurementPrefix string
MeasurementSuffix string
Expand Down
9 changes: 9 additions & 0 deletions models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
metric.AddSuffix(r.Config.NameSuffix)
}

if mt, ok := metric.(telegraf.HighPriorityMetric); ok {
r.log.Tracef("calling output plugin %s directly - before\n", r.Config.Name)
err := r.Output.Write([]telegraf.Metric{mt})
r.log.Tracef("calling output plugin %s directly - after\n", r.Config.Name)
mt.ErrorCh() <- err
r.log.Tracef("calling output plugin %s directly - responded\n", r.Config.Name)
return
}

dropped := r.buffer.Add(metric)
atomic.AddInt64(&r.droppedMetrics, int64(dropped))

Expand Down
48 changes: 45 additions & 3 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,35 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
})
}

switch acc := h.acc.(type) {
case telegraf.HighPriorityAccumulator:
h.writeToHighPriorityAccumulator(req, res, acc, metrics)
default:
for _, m := range metrics {
for headerName, measurementName := range h.HTTPHeaderTags {
headerValues := req.Header.Get(headerName)
if len(headerValues) > 0 {
m.AddTag(measurementName, headerValues)
}
}

if h.PathTag {
m.AddTag(pathTag, req.URL.Path)
}

h.acc.AddMetric(m)
}
}

res.WriteHeader(h.SuccessCode)
}

func (h *HTTPListenerV2) writeToHighPriorityAccumulator(
req *http.Request,
res http.ResponseWriter,
acc telegraf.HighPriorityAccumulator,
metrics []telegraf.Metric,
) {
for _, m := range metrics {
for headerName, measurementName := range h.HTTPHeaderTags {
headerValues := req.Header.Get(headerName)
Expand All @@ -305,10 +334,19 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
m.AddTag(pathTag, req.URL.Path)
}

h.acc.AddMetric(m)
}
if err := acc.AddMetricHighPriority(m); err != nil {
acc.AddError(fmt.Errorf("writing data to output failed: %w", err))
h.Log.Debugf("got error from high-priority-IO")

res.WriteHeader(h.SuccessCode)
res.WriteHeader(http.StatusInternalServerError)
_, err = res.Write([]byte(fmt.Sprintf(`{"error":"%v"}`, err)))
if err != nil {
acc.AddError(fmt.Errorf("send htp response failed: %w", err))
h.Log.Tracef("send http response failed: %v", err)
}
return
}
}
}

func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
Expand Down Expand Up @@ -419,6 +457,10 @@ func (h *HTTPListenerV2) authenticateIfSet(handler http.HandlerFunc, res http.Re
}
}

func (h *HTTPListenerV2) MarkHighPriority() {
// Do nothing
}

func init() {
inputs.Add("http_listener_v2", func() telegraf.Input {
return &HTTPListenerV2{
Expand Down
19 changes: 17 additions & 2 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package socket_listener

import (
_ "embed"
"fmt"
"net"
"sync"

Expand Down Expand Up @@ -63,8 +64,18 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
for _, m := range metrics {
acc.AddMetric(m)
switch ac := acc.(type) {
case telegraf.HighPriorityAccumulator:
for _, m := range metrics {
if err = ac.AddMetricHighPriority(m); err != nil {
sl.Log.Tracef("input socket_listener got error from high-priority-IO")
acc.AddError(fmt.Errorf("writing data to output failed: %w", err))
}
}
default:
for _, m := range metrics {
ac.AddMetric(m)
}
}
}
onError := func(err error) {
Expand All @@ -88,6 +99,10 @@ func (sl *SocketListener) Stop() {
}
}

func (sl *SocketListener) MarkHighPriority() {
// Do nothing
}

func init() {
inputs.Add("socket_listener", func() telegraf.Input {
return &SocketListener{}
Expand Down
9 changes: 9 additions & 0 deletions testutil/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
}
}

func (a *Accumulator) AddMetricHighPriority(m telegraf.Metric) error {
a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time())
return nil
}

func (a *Accumulator) AddSummary(
measurement string,
fields map[string]interface{},
Expand Down Expand Up @@ -222,6 +227,10 @@ func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator
return a
}

func (a *Accumulator) ToHighPriority() telegraf.HighPriorityAccumulator {
return a
}

func (a *Accumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID {
dm, id := metric.WithTracking(m, a.onDelivery)
a.AddMetric(dm)
Expand Down

0 comments on commit 30bbd9a

Please sign in to comment.