diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 776fc3c84..91934a88e 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -177,7 +177,7 @@ parameters: for _, aa := range actualAggs { promEncode.Encode(aa) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, promEncode.(*encode.EncodeProm).Gatherer()) for _, expected := range tt.expectedEncode { require.Contains(t, exposed, expected) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 019352890..02996581d 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -27,9 +27,11 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) +var plog = logrus.WithField("component", "encode.Prometheus") + const defaultExpiryTime = time.Duration(2 * time.Minute) // nolint:revive @@ -38,11 +40,17 @@ type EncodeProm struct { registerer prometheus.Registerer metricCommon *MetricsCommonStruct updateChan chan config.StageParam + server *promserver.PromServer + regName string +} + +func (e *EncodeProm) Gatherer() prometheus.Gatherer { + return e.server } // Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode func (e *EncodeProm) Encode(metricRecord config.GenericMap) { - log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord) + plog.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord) e.metricCommon.MetricCommonEncode(e, metricRecord) e.checkConfUpdate() } @@ -106,45 +114,34 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { cleanupFunc.(func())() } -func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) { +func (e *EncodeProm) addCounter(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) - err := e.registerer.Register(counter) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - } e.metricCommon.AddCounter(fullMetricName, counter, mInfo) + return counter } -func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) { +func (e *EncodeProm) addGauge(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) - err := e.registerer.Register(gauge) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - } e.metricCommon.AddGauge(fullMetricName, gauge, mInfo) + return gauge } -func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) { + +func (e *EncodeProm) addHistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) - err := e.registerer.Register(histogram) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - } e.metricCommon.AddHist(fullMetricName, histogram, mInfo) + return histogram } -func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) { + +func (e *EncodeProm) addAgghistogram(fullMetricName string, mInfo *MetricInfo) prometheus.Collector { agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: ""}, mInfo.Labels) - err := e.registerer.Register(agghistogram) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - } e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo) + return agghistogram } func (e *EncodeProm) unregisterMetric(c interface{}) { if c, ok := c.(prometheus.Collector); ok { e.registerer.Unregister(c) } - } func (e *EncodeProm) cleanDeletedGeneric(newCfg api.PromEncode, metrics map[string]mInfoStruct) { @@ -178,6 +175,38 @@ func (e *EncodeProm) cleanDeletedMetrics(newCfg api.PromEncode) { e.cleanDeletedGeneric(newCfg, e.metricCommon.aggHistos) } +// returns true if a registry restart is needed +func (e *EncodeProm) checkMetricUpdate(prefix string, apiItem *api.MetricsItem, store map[string]mInfoStruct, createMetric func(string, *MetricInfo) prometheus.Collector) bool { + fullMetricName := prefix + apiItem.Name + plog.Debugf("Checking metric: %s", fullMetricName) + mInfo := CreateMetricInfo(apiItem) + if oldMetric, ok := store[fullMetricName]; ok { + if !reflect.DeepEqual(mInfo.MetricsItem.Labels, oldMetric.info.MetricsItem.Labels) { + plog.Debug("Changes detected in labels") + return true + } + if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { + plog.Debug("Changes detected: unregistering and replacing") + e.unregisterMetric(oldMetric.genericMetric) + c := createMetric(fullMetricName, mInfo) + err := e.registerer.Register(c) + if err != nil { + plog.Errorf("error in prometheus.Register: %v", err) + } + } else { + plog.Debug("No changes found") + } + } else { + plog.Debug("New metric") + c := createMetric(fullMetricName, mInfo) + err := e.registerer.Register(c) + if err != nil { + plog.Errorf("error in prometheus.Register: %v", err) + } + } + return false +} + func (e *EncodeProm) checkConfUpdate() { select { case stage := <-e.updateChan: @@ -185,58 +214,35 @@ func (e *EncodeProm) checkConfUpdate() { if stage.Encode != nil && stage.Encode.Prom != nil { cfg = *stage.Encode.Prom } + plog.Infof("Received config update: %v", cfg) e.cleanDeletedMetrics(cfg) + needNewRegistry := false for i := range cfg.Metrics { - fullMetricName := cfg.Prefix + cfg.Metrics[i].Name - mInfo := CreateMetricInfo(&cfg.Metrics[i]) switch cfg.Metrics[i].Type { case api.MetricCounter: - if oldMetric, ok := e.metricCommon.counters[fullMetricName]; ok { - if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { - e.unregisterMetric(oldMetric.genericMetric) - e.addCounter(fullMetricName, mInfo) - } - } else { - // New metric - e.addCounter(fullMetricName, mInfo) - } + needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.counters, e.addCounter) case api.MetricGauge: - if oldMetric, ok := e.metricCommon.gauges[fullMetricName]; ok { - if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { - e.unregisterMetric(oldMetric.genericMetric) - e.addGauge(fullMetricName, mInfo) - } - } else { - // New metric - e.addGauge(fullMetricName, mInfo) - } + needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.gauges, e.addGauge) case api.MetricHistogram: - if oldMetric, ok := e.metricCommon.histos[fullMetricName]; ok { - if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { - e.unregisterMetric(oldMetric.genericMetric) - e.addHistogram(fullMetricName, mInfo) - } - } else { - // New metric - e.addHistogram(fullMetricName, mInfo) - } + needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.histos, e.addHistogram) case api.MetricAggHistogram: - if oldMetric, ok := e.metricCommon.aggHistos[fullMetricName]; ok { - if !reflect.DeepEqual(mInfo.MetricsItem, oldMetric.info.MetricsItem) { - e.unregisterMetric(oldMetric.genericMetric) - e.addAgghistogram(fullMetricName, mInfo) - } - } else { - // New metric - e.addAgghistogram(fullMetricName, mInfo) - } + needNewRegistry = e.checkMetricUpdate(cfg.Prefix, &cfg.Metrics[i], e.metricCommon.aggHistos, e.addAgghistogram) case "default": - log.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type) + plog.Errorf("invalid metric type = %v, skipping", cfg.Metrics[i].Type) continue } - + if needNewRegistry { + break + } + } + e.cfg = &cfg + if needNewRegistry { + // cf https://pkg.go.dev/github.com/prometheus/client_golang@v1.19.0/prometheus#Registerer.Unregister + plog.Info("Changes detected on labels: need registry reset.") + e.resetRegistry() + break } default: //Nothing to do @@ -244,6 +250,40 @@ func (e *EncodeProm) checkConfUpdate() { } } +func (e *EncodeProm) resetRegistry() { + e.metricCommon.cleanupInfoStructs() + reg := prometheus.NewRegistry() + e.registerer = reg + for i := range e.cfg.Metrics { + mCfg := &e.cfg.Metrics[i] + fullMetricName := e.cfg.Prefix + mCfg.Name + labels := mCfg.Labels + plog.Debugf("Create metric: %s, Labels: %v", fullMetricName, labels) + mInfo := CreateMetricInfo(mCfg) + var m prometheus.Collector + switch mCfg.Type { + case api.MetricCounter: + m = e.addCounter(fullMetricName, mInfo) + case api.MetricGauge: + m = e.addGauge(fullMetricName, mInfo) + case api.MetricHistogram: + m = e.addHistogram(fullMetricName, mInfo) + case api.MetricAggHistogram: + m = e.addAgghistogram(fullMetricName, mInfo) + case "default": + plog.Errorf("invalid metric type = %v, skipping", mCfg.Type) + continue + } + if m != nil { + err := e.registerer.Register(m) + if err != nil { + plog.Errorf("error in prometheus.Register: %v", err) + } + } + } + e.server.SetRegistry(e.regName, reg) +} + func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { cfg := api.PromEncode{} if params.Encode != nil && params.Encode.Prom != nil { @@ -254,73 +294,29 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En if expiryTime.Duration == 0 { expiryTime.Duration = defaultExpiryTime } - log.Debugf("expiryTime = %v", expiryTime) + plog.Debugf("expiryTime = %v", expiryTime) - var registerer prometheus.Registerer + registry := prometheus.NewRegistry() - if cfg.PromConnectionInfo != nil { - registry := prometheus.NewRegistry() - registerer = registry - promserver.StartServerAsync(cfg.PromConnectionInfo, nil) - } else { - registerer = prometheus.DefaultRegisterer - } w := &EncodeProm{ - cfg: params.Encode.Prom, - registerer: registerer, + cfg: &cfg, + registerer: registry, updateChan: make(chan config.StageParam), + server: promserver.SharedServer, + regName: params.Name, + } + + if cfg.PromConnectionInfo != nil { + // Start new server + w.server = promserver.StartServerAsync(cfg.PromConnectionInfo, params.Name, registry) } metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup) w.metricCommon = metricCommon - for i := range cfg.Metrics { - mCfg := &cfg.Metrics[i] - fullMetricName := cfg.Prefix + mCfg.Name - labels := mCfg.Labels - log.Debugf("fullMetricName = %v", fullMetricName) - log.Debugf("Labels = %v", labels) - mInfo := CreateMetricInfo(mCfg) - switch mCfg.Type { - case api.MetricCounter: - counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: ""}, labels) - err := registerer.Register(counter) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - return nil, err - } - metricCommon.AddCounter(fullMetricName, counter, mInfo) - case api.MetricGauge: - gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) - err := registerer.Register(gauge) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - return nil, err - } - metricCommon.AddGauge(fullMetricName, gauge, mInfo) - case api.MetricHistogram: - log.Debugf("buckets = %v", mCfg.Buckets) - hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) - err := registerer.Register(hist) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - return nil, err - } - metricCommon.AddHist(fullMetricName, hist, mInfo) - case api.MetricAggHistogram: - log.Debugf("buckets = %v", mCfg.Buckets) - hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) - err := registerer.Register(hist) - if err != nil { - log.Errorf("error during prometheus.Register: %v", err) - return nil, err - } - metricCommon.AddAggHist(fullMetricName, hist, mInfo) - case "default": - log.Errorf("invalid metric type = %v, skipping", mCfg.Type) - continue - } - } + // Init metrics + w.resetRegistry() + return w, nil } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index ea442bf7c..f7b37636b 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -201,7 +201,7 @@ func Test_CustomMetric(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `test_bytes_total{dstIP="10.0.0.1",srcIP="20.0.0.2"} 8`) require.Contains(t, exposed, `test_bytes_total{dstIP="30.0.0.3",srcIP="10.0.0.1"} 12`) @@ -273,7 +273,7 @@ func Test_FilterDuplicates(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `bytes_unfiltered 15`) require.Contains(t, exposed, `bytes_filtered 8`) @@ -318,7 +318,7 @@ func Test_FilterNotNil(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `test_latencies_sum 1`) require.Contains(t, exposed, `test_latencies_count 2`) @@ -373,7 +373,7 @@ func Test_FilterDirection(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `test_ingress_packets_total 10`) require.Contains(t, exposed, `test_egress_packets_total 100`) @@ -431,7 +431,7 @@ func Test_FilterSameOrDifferentNamespace(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `test_packets_same_namespace_total 43000`) require.Contains(t, exposed, `test_packets_different_namespace_total 210`) @@ -459,7 +459,7 @@ func Test_ValueScale(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) // One is less than 25ms, Two are less than 250ms require.Contains(t, exposed, `test_rtt_seconds_bucket{le="0.005"} 0`) @@ -510,7 +510,7 @@ func Test_MetricTTL(t *testing.T) { for _, metric := range metrics { encodeProm.Encode(metric) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `test_bytes_total{dstIP="10.0.0.1",srcIP="20.0.0.2"}`) require.Contains(t, exposed, `test_bytes_total{dstIP="30.0.0.3",srcIP="10.0.0.1"}`) @@ -519,7 +519,7 @@ func Test_MetricTTL(t *testing.T) { time.Sleep(2 * time.Second) // Scrape a second time - exposed = test.ReadExposedMetrics(t) + exposed = test.ReadExposedMetrics(t, encodeProm.server) require.NotContains(t, exposed, `test_bytes_total{dstIP="10.0.0.1",srcIP="20.0.0.2"}`) require.NotContains(t, exposed, `test_bytes_total{dstIP="30.0.0.3",srcIP="10.0.0.1"}`) @@ -563,7 +563,7 @@ func Test_MissingLabels(t *testing.T) { } time.Sleep(100 * time.Millisecond) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, encodeProm.server) require.Contains(t, exposed, `my_counter{namespace="A"} 8`) require.Contains(t, exposed, `my_counter{namespace=""} 4`) diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go index 049134bff..4a14ba5e8 100644 --- a/pkg/pipeline/encode/metrics_common.go +++ b/pkg/pipeline/encode/metrics_common.go @@ -18,7 +18,6 @@ package encode import ( - "fmt" "strings" "time" @@ -242,7 +241,7 @@ func extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[str for _, t := range info.Labels { entryLabels[t] = "" if v, ok := flow[t]; ok { - entryLabels[t] = fmt.Sprintf("%v", v) + entryLabels[t] = utils.ConvertToString(v) } key.WriteString(entryLabels[t]) key.WriteRune('|') @@ -263,6 +262,13 @@ func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCal } } +func (m *MetricsCommonStruct) cleanupInfoStructs() { + m.gauges = map[string]mInfoStruct{} + m.counters = map[string]mInfoStruct{} + m.histos = map[string]mInfoStruct{} + m.aggHistos = map[string]mInfoStruct{} +} + func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int, name string, expiryTime api.Duration, callback putils.CacheCallback) *MetricsCommonStruct { mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, name) m := &MetricsCommonStruct{ diff --git a/pkg/pipeline/extract/conntrack/aggregator_test.go b/pkg/pipeline/extract/conntrack/aggregator_test.go index 158eda0f3..7a555c8ca 100644 --- a/pkg/pipeline/extract/conntrack/aggregator_test.go +++ b/pkg/pipeline/extract/conntrack/aggregator_test.go @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -244,7 +245,7 @@ func TestMissingFieldError(t *testing.T) { flowLog := config.GenericMap{} agg.update(conn, flowLog, dirAB, true) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`) } @@ -260,7 +261,7 @@ func TestSkipMissingFieldError(t *testing.T) { flowLog := config.GenericMap{} agg.update(conn, flowLog, dirAB, true) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.NotContains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"}`) } @@ -276,6 +277,6 @@ func TestFloat64ConversionError(t *testing.T) { flowLog := config.GenericMap{"Bytes": "float64 inconvertible value"} agg.update(conn, flowLog, dirAB, true) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_aggregator_errors{error="Float64ConversionError",field="Bytes"} 1`) } diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index cf2f45210..2e9541398 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -31,6 +31,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -898,7 +899,7 @@ func TestScheduling(t *testing.T) { assertStoreConsistency(t, ct) }) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_end_connections{group="0: Proto=17, ",reason="timeout"} 1`) } @@ -1123,7 +1124,7 @@ func TestDetectEndConnection(t *testing.T) { assertStoreConsistency(t, ct) }) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_tcp_flags{action="detectEndConnection"} 1`) require.Contains(t, exposed, `conntrack_input_records{classification="duplicate"} 1`) } @@ -1183,7 +1184,7 @@ func TestSwapAB(t *testing.T) { assertStoreConsistency(t, ct) }) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_tcp_flags{action="swapAB"} 1`) } @@ -1273,6 +1274,6 @@ func TestExpiringConnection(t *testing.T) { assertStoreConsistency(t, ct) }) } - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_end_connections{group="0: DEFAULT",reason="FIN_flag"} 1`) } diff --git a/pkg/pipeline/extract/conntrack/hash_test.go b/pkg/pipeline/extract/conntrack/hash_test.go index 8e24648d1..aaef254b8 100644 --- a/pkg/pipeline/extract/conntrack/hash_test.go +++ b/pkg/pipeline/extract/conntrack/hash_test.go @@ -24,6 +24,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -235,6 +236,6 @@ func TestComputeHash_MissingField(t *testing.T) { h, err := computeHash(fl, &keyDefinition, testHasher, metrics) require.NoError(t, err) require.NotNil(t, h) - exposed := test.ReadExposedMetrics(t) + exposed := test.ReadExposedMetrics(t, prometheus.DefaultGatherer) require.Contains(t, exposed, `conntrack_hash_errors{error="MissingFieldError",field="Missing"} 1`) } diff --git a/pkg/pipeline/extract/timebased/tables.go b/pkg/pipeline/extract/timebased/tables.go index 26d6bcc13..3bdb41dfe 100644 --- a/pkg/pipeline/extract/timebased/tables.go +++ b/pkg/pipeline/extract/timebased/tables.go @@ -40,10 +40,8 @@ func AddEntryToTables(indexKeyStructs map[string]*IndexKeyTable, entry config.Ge b.WriteRune(',') } if val, ok := entry[key]; ok { - valStr, err := utils.ConvertToString(val) - if err != nil { - log.Errorf("Cannot convert value to string %v : %v", val, err) - } else if len(valStr) > 0 { + valStr := utils.ConvertToString(val) + if len(valStr) > 0 { b.WriteString(valStr) validValuesCount++ } diff --git a/pkg/pipeline/transform/transform_filter.go b/pkg/pipeline/transform/transform_filter.go index 96a200897..230b559e0 100644 --- a/pkg/pipeline/transform/transform_filter.go +++ b/pkg/pipeline/transform/transform_filter.go @@ -126,7 +126,7 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran entry[rule.AddFieldIf.Output+"_Evaluate"] = true } case api.AddLabel: - labels[rule.AddLabel.Input], _ = utils.ConvertToString(rule.AddLabel.Value) + labels[rule.AddLabel.Input] = utils.ConvertToString(rule.AddLabel.Value) case api.AddLabelIf: // TODO perhaps add a cache of previously evaluated expressions expressionString := fmt.Sprintf("val %s", rule.AddLabelIf.Parameters) diff --git a/pkg/pipeline/utils/timed_cache.go b/pkg/pipeline/utils/timed_cache.go index bac404664..bedc38c4f 100644 --- a/pkg/pipeline/utils/timed_cache.go +++ b/pkg/pipeline/utils/timed_cache.go @@ -87,7 +87,7 @@ func (tc *TimedCache) UpdateCacheEntry(key string, entry interface{}) bool { key: key, SourceEntry: entry, } - uclog.Debugf("adding entry: %#v", cEntry) + uclog.Tracef("adding entry: %#v", cEntry) // place at end of list cEntry.e = tc.cacheList.PushBack(cEntry) tc.cacheMap[key] = cEntry diff --git a/pkg/prometheus/prom_server.go b/pkg/prometheus/prom_server.go index 19a226583..6353c1801 100644 --- a/pkg/prometheus/prom_server.go +++ b/pkg/prometheus/prom_server.go @@ -18,25 +18,53 @@ package prometheus import ( + "context" "crypto/tls" + "errors" "fmt" "net/http" + "sync" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/server" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" "github.com/sirupsen/logrus" ) var ( - plog = logrus.WithField("component", "prometheus") - maybePanic = plog.Fatalf + plog = logrus.WithField("component", "prometheus") + maybePanic = plog.Fatalf + SharedServer = &PromServer{} ) +type PromServer struct { + httpServer *http.Server + namedRegistries sync.Map +} + +func (ps *PromServer) Gather() ([]*dto.MetricFamily, error) { + all := prom.Gatherers{} + ps.namedRegistries.Range(func(_, value interface{}) bool { + r := value.(prom.Gatherer) + all = append(all, r) + return true + }) + return all.Gather() +} + +func (ps *PromServer) Shutdown(ctx context.Context) error { + return ps.httpServer.Shutdown(ctx) +} + +func (ps *PromServer) SetRegistry(name string, registry prom.Gatherer) { + ps.namedRegistries.Store(name, registry) +} + // InitializePrometheus starts the global Prometheus server, used for operational metrics and prom-encode stages if they don't override the server settings -func InitializePrometheus(settings *config.MetricsSettings) *http.Server { +func InitializePrometheus(settings *config.MetricsSettings) *PromServer { if settings.NoPanic { maybePanic = plog.Errorf } @@ -44,17 +72,17 @@ func InitializePrometheus(settings *config.MetricsSettings) *http.Server { plog.Info("Disabled global Prometheus server - no operational metrics will be available") return nil } + r := prom.DefaultGatherer if settings.SuppressGoMetrics { // set up private prometheus registry - r := prom.NewRegistry() - prom.DefaultRegisterer = r - prom.DefaultGatherer = r + r = prom.NewRegistry() } - return StartServerAsync(&settings.PromConnectionInfo, nil) + SharedServer = StartServerAsync(&settings.PromConnectionInfo, "", r) + return SharedServer } // StartServerAsync listens for prometheus resource usage requests -func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *http.Server { +func StartServerAsync(conn *api.PromConnectionInfo, regName string, registry prom.Gatherer) *PromServer { // create prometheus server for operational metrics // if value of address is empty, then by default it will take 0.0.0.0 port := conn.Port @@ -64,7 +92,7 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht addr := fmt.Sprintf("%s:%v", conn.Address, port) plog.Infof("StartServerAsync: addr = %s", addr) - httpServer := &http.Server{ + httpServer := http.Server{ Addr: addr, // TLS clients must use TLS 1.2 or higher TLSConfig: &tls.Config{ @@ -74,13 +102,8 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht // The Handler function provides a default handler to expose metrics // via an HTTP server. "/metrics" is the usual endpoint for that. mux := http.NewServeMux() - if registry == nil { - mux.Handle("/metrics", promhttp.Handler()) - } else { - mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) - } httpServer.Handler = mux - httpServer = server.Default(httpServer) + server.Default(&httpServer) go func() { var err error @@ -89,10 +112,15 @@ func StartServerAsync(conn *api.PromConnectionInfo, registry *prom.Registry) *ht } else { err = httpServer.ListenAndServe() } - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { maybePanic("error in http.ListenAndServe: %v", err) } }() - return httpServer + p := PromServer{httpServer: &httpServer} + p.namedRegistries.Store(regName, registry) + + mux.Handle("/metrics", promhttp.HandlerFor(&p, promhttp.HandlerOpts{})) + + return &p } diff --git a/pkg/test/prom.go b/pkg/test/prom.go index de53b4ddf..c522da1dc 100644 --- a/pkg/test/prom.go +++ b/pkg/test/prom.go @@ -5,16 +5,17 @@ import ( "net/http/httptest" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" ) -func ReadExposedMetrics(t *testing.T) string { +func ReadExposedMetrics(t *testing.T, reg prometheus.Gatherer) string { req := httptest.NewRequest(http.MethodGet, "http://localhost:9090", nil) require.NotNil(t, req) w := httptest.NewRecorder() require.NotNil(t, w) - promhttp.Handler().ServeHTTP(w, req) + promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(w, req) require.NotNil(t, w.Body) return w.Body.String() } diff --git a/pkg/utils/convert.go b/pkg/utils/convert.go index 28f80b7ff..1c4f0db3f 100644 --- a/pkg/utils/convert.go +++ b/pkg/utils/convert.go @@ -243,27 +243,27 @@ func ConvertToBool(unk interface{}) (bool, error) { } } -func ConvertToString(unk interface{}) (string, error) { +func ConvertToString(unk interface{}) string { switch i := unk.(type) { case float64: - return strconv.FormatFloat(i, 'E', -1, 64), nil + return strconv.FormatFloat(i, 'E', -1, 64) case float32: - return strconv.FormatFloat(float64(i), 'E', -1, 32), nil + return strconv.FormatFloat(float64(i), 'E', -1, 32) case int64: - return strconv.FormatInt(i, 10), nil + return strconv.FormatInt(i, 10) case int32: - return strconv.FormatInt(int64(i), 10), nil + return strconv.FormatInt(int64(i), 10) case int: - return strconv.FormatInt(int64(i), 10), nil + return strconv.FormatInt(int64(i), 10) case uint64: - return strconv.FormatUint(i, 10), nil + return strconv.FormatUint(i, 10) case uint32: - return strconv.FormatUint(uint64(i), 10), nil + return strconv.FormatUint(uint64(i), 10) case uint: - return strconv.FormatUint(uint64(i), 10), nil + return strconv.FormatUint(uint64(i), 10) case string: - return i, nil + return i default: - return fmt.Sprintf("%v", unk), nil + return fmt.Sprintf("%v", unk) } }