From 30bbd9a8e77baadf7391c73adefa61ab36a0f7b8 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Thu, 1 Aug 2024 11:41:31 +0800 Subject: [PATCH] feat(accumulator): add high priority accumulator to get response from output plugins immediately --- accumulator.go | 9 ++++ agent/accumulator.go | 22 +++++++++ agent/agent.go | 4 ++ config/config.go | 11 +++++ input.go | 8 ++++ metric.go | 9 ++++ metric/metric.go | 16 +++++++ models/running_input.go | 2 + models/running_output.go | 9 ++++ .../http_listener_v2/http_listener_v2.go | 48 +++++++++++++++++-- .../inputs/socket_listener/socket_listener.go | 19 +++++++- testutil/accumulator.go | 9 ++++ 12 files changed, 161 insertions(+), 5 deletions(-) diff --git a/accumulator.go b/accumulator.go index 6d740fb529021..c9a4e7a14a4a3 100644 --- a/accumulator.go +++ b/accumulator.go @@ -52,6 +52,9 @@ type Accumulator interface { // Upgrade to a TrackingAccumulator with space for maxTracked // metrics/batches. WithTracking(maxTracked int) TrackingAccumulator + + // Upgrade to a HighPriorityAccumulator + ToHighPriority() HighPriorityAccumulator } // TrackingID uniquely identifies a tracked metric group @@ -91,3 +94,9 @@ type TrackingAccumulator interface { // Delivered returns a channel that will contain the tracking results. Delivered() <-chan DeliveryInfo } + +type HighPriorityAccumulator interface { + Accumulator + + AddMetricHighPriority(Metric) error +} diff --git a/agent/accumulator.go b/agent/accumulator.go index 9105c17611292..0f27b90c72efe 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -126,6 +126,12 @@ func (ac *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator } } +func (ac *accumulator) ToHighPriority() telegraf.HighPriorityAccumulator { + return &highPriorityAccumulator{ + ac, + } +} + type trackingAccumulator struct { telegraf.Accumulator delivered chan telegraf.DeliveryInfo @@ -158,3 +164,19 @@ func (a *trackingAccumulator) onDelivery(info telegraf.DeliveryInfo) { panic("channel is full") } } + +type highPriorityAccumulator struct { + *accumulator +} + +func (ac *highPriorityAccumulator) AddMetricHighPriority(m telegraf.Metric) error { + m.SetTime(m.Time().Round(ac.precision)) + if m := ac.maker.MakeMetric(m); m != nil { + errCh := make(chan error, 1) + ac.metrics <- m.ToHighPriority(errCh) + + err := <-errCh + return err + } + return nil +} diff --git a/agent/agent.go b/agent/agent.go index 5d7cc033fe5b9..f6881a91f5366 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -354,6 +354,10 @@ func (a *Agent) startInputs( acc := NewAccumulator(input, dst) acc.SetPrecision(getPrecision(precision, interval)) + if input.Config.HighPriorityIO { + acc = acc.ToHighPriority() + } + if err := input.Start(acc); err != nil { // If the model tells us to remove the plugin we do so without error var fatalErr *internal.FatalError diff --git a/config/config.go b/config/config.go index efe43c7fd634c..640da57c679ae 100644 --- a/config/config.go +++ b/config/config.go @@ -1323,6 +1323,13 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } + if pluginConfig.HighPriorityIO { + if _, ok = input.(telegraf.HighPriorityInput); !ok { + return fmt.Errorf("input plugin %s is not high priority input plugin", pluginConfig.Name) + } + log.Printf("I! [agent] Input plugin %s is high-priority-IO\n", pluginConfig.Name) + } + if err := c.toml.UnmarshalTable(table, input); err != nil { return err } @@ -1531,6 +1538,7 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e cp.NameOverride = c.getFieldString(tbl, "name_override") cp.Alias = c.getFieldString(tbl, "alias") cp.LogLevel = c.getFieldString(tbl, "log_level") + cp.HighPriorityIO = c.getFieldBool(tbl, "high_priority_io") cp.Tags = make(map[string]string) if node, ok := tbl.Fields["tags"]; ok { @@ -1621,6 +1629,9 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Parser and serializer options to ignore case "data_type", "influx_parser_type": + // Customized options to ignore + case "high_priority_io": + default: c.unusedFieldsMutex.Lock() c.UnusedFields[key] = true diff --git a/input.go b/input.go index 0f2dac2a397f2..7b646ef4d2cb8 100644 --- a/input.go +++ b/input.go @@ -21,3 +21,11 @@ type ServiceInput interface { // to the accumulator before returning. Stop() } + +type HighPriorityInput interface { + Input + + // MarkHighPriority does nothing but tell us this input plugin may be a high + // priority input plugin. + MarkHighPriority() +} diff --git a/metric.go b/metric.go index adad3cfc38c33..bb3ac7592eece 100644 --- a/metric.go +++ b/metric.go @@ -125,6 +125,15 @@ type Metric interface { // Drop marks the metric as processed successfully without being written // to any output. Drop() + + // ToHighPriority upgrade the metric to a HighPriorityMetric. + ToHighPriority(chan<- error) HighPriorityMetric +} + +type HighPriorityMetric interface { + Metric + + ErrorCh() chan<- error } // TemplateMetric is an interface to use in templates (e.g text/template) diff --git a/metric/metric.go b/metric/metric.go index f07f54c2672e6..0106507b4e9fa 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -286,6 +286,22 @@ func (m *metric) Reject() { func (m *metric) Drop() { } +func (m *metric) ToHighPriority(errorCh chan<- error) telegraf.HighPriorityMetric { + return &highPriorityMetric{ + metric: m, + errorCh: errorCh, + } +} + +type highPriorityMetric struct { + *metric + errorCh chan<- error +} + +func (m *highPriorityMetric) ErrorCh() chan<- error { + return m.errorCh +} + // Convert field to a supported type or nil if inconvertible func convertField(v interface{}) interface{} { switch v := v.(type) { diff --git a/models/running_input.go b/models/running_input.go index 241bb3742c95a..2b5921494edb9 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -90,6 +90,8 @@ type InputConfig struct { StartupErrorBehavior string LogLevel string + HighPriorityIO bool + NameOverride string MeasurementPrefix string MeasurementSuffix string diff --git a/models/running_output.go b/models/running_output.go index 063ceb7012c20..b98f47a467130 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -242,6 +242,15 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) { metric.AddSuffix(r.Config.NameSuffix) } + if mt, ok := metric.(telegraf.HighPriorityMetric); ok { + r.log.Tracef("calling output plugin %s directly - before\n", r.Config.Name) + err := r.Output.Write([]telegraf.Metric{mt}) + r.log.Tracef("calling output plugin %s directly - after\n", r.Config.Name) + mt.ErrorCh() <- err + r.log.Tracef("calling output plugin %s directly - responded\n", r.Config.Name) + return + } + dropped := r.buffer.Add(metric) atomic.AddInt64(&r.droppedMetrics, int64(dropped)) diff --git a/plugins/inputs/http_listener_v2/http_listener_v2.go b/plugins/inputs/http_listener_v2/http_listener_v2.go index 825da44535801..4bec7f2283f69 100644 --- a/plugins/inputs/http_listener_v2/http_listener_v2.go +++ b/plugins/inputs/http_listener_v2/http_listener_v2.go @@ -293,6 +293,35 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) }) } + switch acc := h.acc.(type) { + case telegraf.HighPriorityAccumulator: + h.writeToHighPriorityAccumulator(req, res, acc, metrics) + default: + for _, m := range metrics { + for headerName, measurementName := range h.HTTPHeaderTags { + headerValues := req.Header.Get(headerName) + if len(headerValues) > 0 { + m.AddTag(measurementName, headerValues) + } + } + + if h.PathTag { + m.AddTag(pathTag, req.URL.Path) + } + + h.acc.AddMetric(m) + } + } + + res.WriteHeader(h.SuccessCode) +} + +func (h *HTTPListenerV2) writeToHighPriorityAccumulator( + req *http.Request, + res http.ResponseWriter, + acc telegraf.HighPriorityAccumulator, + metrics []telegraf.Metric, +) { for _, m := range metrics { for headerName, measurementName := range h.HTTPHeaderTags { headerValues := req.Header.Get(headerName) @@ -305,10 +334,19 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) m.AddTag(pathTag, req.URL.Path) } - h.acc.AddMetric(m) - } + if err := acc.AddMetricHighPriority(m); err != nil { + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + h.Log.Debugf("got error from high-priority-IO") - res.WriteHeader(h.SuccessCode) + res.WriteHeader(http.StatusInternalServerError) + _, err = res.Write([]byte(fmt.Sprintf(`{"error":"%v"}`, err))) + if err != nil { + acc.AddError(fmt.Errorf("send htp response failed: %w", err)) + h.Log.Tracef("send http response failed: %v", err) + } + return + } + } } func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) { @@ -419,6 +457,10 @@ func (h *HTTPListenerV2) authenticateIfSet(handler http.HandlerFunc, res http.Re } } +func (h *HTTPListenerV2) MarkHighPriority() { + // Do nothing +} + func init() { inputs.Add("http_listener_v2", func() telegraf.Input { return &HTTPListenerV2{ diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b4783afc1af86..e459e186a5eb8 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -4,6 +4,7 @@ package socket_listener import ( _ "embed" + "fmt" "net" "sync" @@ -63,8 +64,18 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { sl.Log.Debug(internal.NoMetricsCreatedMsg) }) } - for _, m := range metrics { - acc.AddMetric(m) + switch ac := acc.(type) { + case telegraf.HighPriorityAccumulator: + for _, m := range metrics { + if err = ac.AddMetricHighPriority(m); err != nil { + sl.Log.Tracef("input socket_listener got error from high-priority-IO") + acc.AddError(fmt.Errorf("writing data to output failed: %w", err)) + } + } + default: + for _, m := range metrics { + ac.AddMetric(m) + } } } onError := func(err error) { @@ -88,6 +99,10 @@ func (sl *SocketListener) Stop() { } } +func (sl *SocketListener) MarkHighPriority() { + // Do nothing +} + func init() { inputs.Add("socket_listener", func() telegraf.Input { return &SocketListener{} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 66052077556a9..bdf544e7ad7ff 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -176,6 +176,11 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { } } +func (a *Accumulator) AddMetricHighPriority(m telegraf.Metric) error { + a.addMeasurement(m.Name(), m.Tags(), m.Fields(), m.Type(), m.Time()) + return nil +} + func (a *Accumulator) AddSummary( measurement string, fields map[string]interface{}, @@ -222,6 +227,10 @@ func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator return a } +func (a *Accumulator) ToHighPriority() telegraf.HighPriorityAccumulator { + return a +} + func (a *Accumulator) AddTrackingMetric(m telegraf.Metric) telegraf.TrackingID { dm, id := metric.WithTracking(m, a.onDelivery) a.AddMetric(dm)