diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 9780e1bea..9cdef8e1a 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -18,229 +18,82 @@ package encode import ( - "fmt" - "strings" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" - putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus" - "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) const defaultExpiryTime = time.Duration(2 * time.Minute) -type gaugeInfo struct { - gauge *prometheus.GaugeVec - info *MetricInfo -} - -type counterInfo struct { - counter *prometheus.CounterVec - info *MetricInfo -} - -type histoInfo struct { - histo *prometheus.HistogramVec - info *MetricInfo -} - type EncodeProm struct { - cfg *api.PromEncode - registerer prometheus.Registerer - gauges []gaugeInfo - counters []counterInfo - histos []histoInfo - aggHistos []histoInfo - expiryTime time.Duration - mCache *putils.TimedCache - mChacheLenMetric prometheus.Gauge - exitChan <-chan struct{} - metricsProcessed prometheus.Counter - metricsDropped prometheus.Counter - errorsCounter *prometheus.CounterVec + cfg *api.PromEncode + registerer prometheus.Registerer + metricCommon *MetricsCommonStruct } -var ( - MetricsProcessed = operational.DefineMetric( - "metrics_processed", - "Number of metrics processed", - operational.TypeCounter, - "stage", - ) - MetricsDropped = operational.DefineMetric( - "metrics_dropped", - "Number of metrics dropped", - operational.TypeCounter, - "stage", - ) - EncodePromErrors = operational.DefineMetric( - "encode_prom_errors", - "Total errors during metrics generation", - operational.TypeCounter, - "error", "metric", "key", - ) - mChacheLen = operational.DefineMetric( - "encode_prom_metrics_reported", - "Total number of prometheus metrics reported by this stage", - operational.TypeGauge, - "stage", - ) -) - -// Encode encodes a metric before being stored +// Encode encodes a metric before being stored; the heavy work is done by the MetricCommonEncode func (e *EncodeProm) Encode(metricRecord config.GenericMap) { log.Tracef("entering EncodeMetric. metricRecord = %v", metricRecord) - - // Process counters - for _, mInfo := range e.counters { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.counter.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.counter.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Add(value) - e.metricsProcessed.Inc() - } - - // Process gauges - for _, mInfo := range e.gauges { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.gauge.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.gauge.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Set(value) - e.metricsProcessed.Inc() - } - - // Process histograms - for _, mInfo := range e.histos { - labels, value := e.prepareMetric(metricRecord, mInfo.info, mInfo.histo.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.histo.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - m.Observe(value) - e.metricsProcessed.Inc() - } - - // Process pre-aggregated histograms - for _, mInfo := range e.aggHistos { - labels, values := e.prepareAggHisto(metricRecord, mInfo.info, mInfo.histo.MetricVec) - if labels == nil { - continue - } - m, err := mInfo.histo.GetMetricWith(labels) - if err != nil { - log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) - e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() - continue - } - for _, v := range values { - m.Observe(v) - } - e.metricsProcessed.Inc() - } + e.metricCommon.MetricCommonEncode(e, metricRecord) } -func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, float64) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, 0 - } - floatVal, err := utils.ConvertToFloat64(val) +func (e *EncodeProm) ProcessCounter(m interface{}, labels map[string]string, value float64) error { + counter := m.(*prometheus.CounterVec) + mm, err := counter.GetMetricWith(labels) if err != nil { - e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() - return nil, 0 - } - if info.ValueScale != 0 { - floatVal = floatVal / info.ValueScale + return err } - - entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) - if !ok { - e.metricsDropped.Inc() - return nil, 0 - } - return entryLabels, floatVal + mm.Add(value) + return nil } -func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, []float64) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, nil - } - values, ok := val.([]float64) - if !ok { - e.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() - return nil, nil +func (e *EncodeProm) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error { + gauge := m.(*prometheus.GaugeVec) + mm, err := gauge.GetMetricWith(labels) + if err != nil { + return err } + mm.Set(value) + return nil +} - entryLabels, key := ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) }) - if !ok { - e.metricsDropped.Inc() - return nil, nil +func (e *EncodeProm) ProcessHist(m interface{}, labels map[string]string, value float64) error { + hist := m.(*prometheus.HistogramVec) + mm, err := hist.GetMetricWith(labels) + if err != nil { + return err } - return entryLabels, values + mm.Observe(value) + return nil } -func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} { - for _, pred := range info.FilterPredicates { - if !pred(flow) { - return nil - } - } - if info.ValueKey == "" { - // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 - return 1 +func (e *EncodeProm) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { + hist := m.(*prometheus.HistogramVec) + mm, err := hist.GetMetricWith(labels) + if err != nil { + return err } - val, found := flow[info.ValueKey] - if !found { - e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() - return nil + for _, v := range values { + mm.Observe(v) } - return val + return nil } -func ExtractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { - entryLabels := make(map[string]string, len(info.Labels)) - key := strings.Builder{} - key.WriteString(info.Name) - key.WriteRune('|') - for _, t := range info.Labels { - entryLabels[t] = "" - if v, ok := flow[t]; ok { - entryLabels[t] = fmt.Sprintf("%v", v) - } - key.WriteString(entryLabels[t]) - key.WriteRune('|') +func (e *EncodeProm) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} { + switch mv := m.(type) { + case *prometheus.CounterVec: + return func() { mv.Delete(entryLabels) } + case *prometheus.GaugeVec: + return func() { mv.Delete(entryLabels) } + case *prometheus.HistogramVec: + return func() { mv.Delete(entryLabels) } } - return entryLabels, key.String() + return nil } // callback function from lru cleanup @@ -248,19 +101,6 @@ func (e *EncodeProm) Cleanup(cleanupFunc interface{}) { cleanupFunc.(func())() } -func (e *EncodeProm) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(e.expiryTime) - for { - select { - case <-e.exitChan: - log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") - return - case <-ticker.C: - e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) - } - } -} - func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (Encoder, error) { cfg := api.PromEncode{} if params.Encode != nil && params.Encode.Prom != nil { @@ -282,11 +122,13 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En } else { registerer = prometheus.DefaultRegisterer } + w := &EncodeProm{ + cfg: params.Encode.Prom, + registerer: registerer, + } - counters := []counterInfo{} - gauges := []gaugeInfo{} - histos := []histoInfo{} - aggHistos := []histoInfo{} + metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup) + w.metricCommon = metricCommon for _, mCfg := range cfg.Metrics { fullMetricName := cfg.Prefix + mCfg.Name @@ -302,10 +144,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - counters = append(counters, counterInfo{ - counter: counter, - info: mInfo, - }) + metricCommon.AddCounter(counter, mInfo) case api.MetricEncodeOperationName("Gauge"): gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: ""}, labels) err := registerer.Register(gauge) @@ -313,10 +152,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - gauges = append(gauges, gaugeInfo{ - gauge: gauge, - info: mInfo, - }) + metricCommon.AddGauge(gauge, mInfo) case api.MetricEncodeOperationName("Histogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -325,10 +161,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - histos = append(histos, histoInfo{ - histo: hist, - info: mInfo, - }) + metricCommon.AddHist(hist, mInfo) case api.MetricEncodeOperationName("AggHistogram"): log.Debugf("buckets = %v", mCfg.Buckets) hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: "", Buckets: mCfg.Buckets}, labels) @@ -337,38 +170,11 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En log.Errorf("error during prometheus.Register: %v", err) return nil, err } - aggHistos = append(aggHistos, histoInfo{ - histo: hist, - info: mInfo, - }) + metricCommon.AddAggHist(hist, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue } } - - log.Debugf("counters = %v", counters) - log.Debugf("gauges = %v", gauges) - log.Debugf("histos = %v", histos) - log.Debugf("aggHistos = %v", aggHistos) - - mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, params.Name) - - w := &EncodeProm{ - cfg: params.Encode.Prom, - registerer: registerer, - counters: counters, - gauges: gauges, - histos: histos, - aggHistos: aggHistos, - expiryTime: expiryTime.Duration, - mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric), - mChacheLenMetric: mChacheLenMetric, - exitChan: putils.ExitChannel(), - metricsProcessed: opMetrics.NewCounter(&MetricsProcessed, params.Name), - metricsDropped: opMetrics.NewCounter(&MetricsDropped, params.Name), - errorsCounter: opMetrics.NewCounterVec(&EncodePromErrors), - } - go w.cleanupExpiredEntriesLoop() return w, nil } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 6a2f81a83..2de05e4d0 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -93,30 +93,30 @@ func Test_NewEncodeProm(t *testing.T) { encodeProm, err := initProm(cfg.Parameters[0].Encode.Prom) require.NoError(t, err) - require.Equal(t, 1, len(encodeProm.counters)) - require.Equal(t, 1, len(encodeProm.gauges)) - require.Equal(t, 1, len(encodeProm.histos)) - require.Equal(t, 1, len(encodeProm.aggHistos)) - require.Equal(t, time.Second, encodeProm.expiryTime) + require.Equal(t, 1, len(encodeProm.metricCommon.counters)) + require.Equal(t, 1, len(encodeProm.metricCommon.gauges)) + require.Equal(t, 1, len(encodeProm.metricCommon.histos)) + require.Equal(t, 1, len(encodeProm.metricCommon.aggHistos)) + require.Equal(t, time.Second, encodeProm.metricCommon.expiryTime) - require.Equal(t, encodeProm.gauges[0].info.Name, "Bytes") + require.Equal(t, encodeProm.metricCommon.gauges[0].info.Name, "Bytes") expectedList := []string{"srcAddr", "dstAddr", "srcPort"} - require.Equal(t, encodeProm.gauges[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.gauges[0].info.Labels, expectedList) - require.Equal(t, encodeProm.counters[0].info.Name, "Packets") + require.Equal(t, encodeProm.metricCommon.counters[0].info.Name, "Packets") expectedList = []string{"srcAddr", "dstAddr", "dstPort"} - require.Equal(t, encodeProm.counters[0].info.Labels, expectedList) + require.Equal(t, encodeProm.metricCommon.counters[0].info.Labels, expectedList) entry := test.GetExtractMockEntry() encodeProm.Encode(entry) // verify entries are in cache; one for the gauge and one for the counter - entriesMapLen := encodeProm.mCache.GetCacheLen() + entriesMapLen := encodeProm.metricCommon.mCache.GetCacheLen() require.Equal(t, 2, entriesMapLen) // wait a couple seconds so that the entry will expire time.Sleep(2 * time.Second) - encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm.Cleanup) - entriesMapLen = encodeProm.mCache.GetCacheLen() + encodeProm.metricCommon.mCache.CleanupExpiredEntries(encodeProm.metricCommon.expiryTime, encodeProm.Cleanup) + entriesMapLen = encodeProm.metricCommon.mCache.GetCacheLen() require.Equal(t, 0, entriesMapLen) } @@ -631,18 +631,18 @@ func Test_MultipleProm(t *testing.T) { encodeProm3, err := initProm(cfg.Parameters[2].Encode.Prom) require.NoError(t, err) - require.Equal(t, 0, len(encodeProm1.counters)) - require.Equal(t, 1, len(encodeProm1.gauges)) - require.Equal(t, 0, len(encodeProm1.histos)) - require.Equal(t, 0, len(encodeProm1.aggHistos)) - require.Equal(t, 1, len(encodeProm2.counters)) - require.Equal(t, 0, len(encodeProm2.gauges)) - require.Equal(t, 0, len(encodeProm2.histos)) - require.Equal(t, 0, len(encodeProm2.aggHistos)) - require.Equal(t, 0, len(encodeProm3.counters)) - require.Equal(t, 0, len(encodeProm3.gauges)) - require.Equal(t, 1, len(encodeProm3.histos)) - require.Equal(t, 0, len(encodeProm3.aggHistos)) + require.Equal(t, 0, len(encodeProm1.metricCommon.counters)) + require.Equal(t, 1, len(encodeProm1.metricCommon.gauges)) + require.Equal(t, 0, len(encodeProm1.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm1.metricCommon.aggHistos)) + require.Equal(t, 1, len(encodeProm2.metricCommon.counters)) + require.Equal(t, 0, len(encodeProm2.metricCommon.gauges)) + require.Equal(t, 0, len(encodeProm2.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm2.metricCommon.aggHistos)) + require.Equal(t, 0, len(encodeProm3.metricCommon.counters)) + require.Equal(t, 0, len(encodeProm3.metricCommon.gauges)) + require.Equal(t, 1, len(encodeProm3.metricCommon.histos)) + require.Equal(t, 0, len(encodeProm3.metricCommon.aggHistos)) require.Equal(t, "test1_", encodeProm1.cfg.Prefix) require.Equal(t, "test2_", encodeProm2.cfg.Prefix) diff --git a/pkg/pipeline/encode/metrics_common.go b/pkg/pipeline/encode/metrics_common.go new file mode 100644 index 000000000..c90a5a2e9 --- /dev/null +++ b/pkg/pipeline/encode/metrics_common.go @@ -0,0 +1,279 @@ +/* + * Copyright (C) 2024 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package encode + +import ( + "fmt" + "strings" + "time" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +type mInfoStruct struct { + genericMetric interface{} // can be a counter, gauge, or histogram pointer + info *MetricInfo +} + +type MetricsCommonStruct struct { + gauges []mInfoStruct + counters []mInfoStruct + histos []mInfoStruct + aggHistos []mInfoStruct + mCache *putils.TimedCache + mChacheLenMetric prometheus.Gauge + metricsProcessed prometheus.Counter + metricsDropped prometheus.Counter + errorsCounter *prometheus.CounterVec + expiryTime time.Duration + exitChan <-chan struct{} +} + +type MetricsCommonInterface interface { + GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} + ProcessCounter(m interface{}, labels map[string]string, value float64) error + ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error + ProcessHist(m interface{}, labels map[string]string, value float64) error + ProcessAggHist(m interface{}, labels map[string]string, value []float64) error +} + +var ( + metricsProcessed = operational.DefineMetric( + "metrics_processed", + "Number of metrics processed", + operational.TypeCounter, + "stage", + ) + metricsDropped = operational.DefineMetric( + "metrics_dropped", + "Number of metrics dropped", + operational.TypeCounter, + "stage", + ) + encodePromErrors = operational.DefineMetric( + "encode_prom_errors", + "Total errors during metrics generation", + operational.TypeCounter, + "error", "metric", "key", + ) + mChacheLen = operational.DefineMetric( + "encode_prom_metrics_reported", + "Total number of prometheus metrics reported by this stage", + operational.TypeGauge, + "stage", + ) +) + +func (m *MetricsCommonStruct) AddCounter(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.counters = append(m.counters, mStruct) +} + +func (m *MetricsCommonStruct) AddGauge(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.gauges = append(m.gauges, mStruct) +} + +func (m *MetricsCommonStruct) AddHist(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.histos = append(m.histos, mStruct) +} + +func (m *MetricsCommonStruct) AddAggHist(g interface{}, info *MetricInfo) { + mStruct := mInfoStruct{genericMetric: g, info: info} + m.aggHistos = append(m.aggHistos, mStruct) +} + +func (e *MetricsCommonStruct) MetricCommonEncode(mci MetricsCommonInterface, metricRecord config.GenericMap) { + log.Tracef("entering MetricCommonEncode. metricRecord = %v", metricRecord) + + // Process counters + for _, mInfo := range e.counters { + labels, value, _ := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessCounter(mInfo.genericMetric, labels, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process gauges + for _, mInfo := range e.gauges { + labels, value, key := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessGauge(mInfo.genericMetric, labels, value, key) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process histograms + for _, mInfo := range e.histos { + labels, value, _ := e.prepareMetric(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessHist(mInfo.genericMetric, labels, value) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } + + // Process pre-aggregated histograms + for _, mInfo := range e.aggHistos { + labels, values := e.prepareAggHisto(mci, metricRecord, mInfo.info, mInfo.genericMetric) + if labels == nil { + continue + } + err := mci.ProcessAggHist(mInfo.genericMetric, labels, values) + if err != nil { + log.Errorf("labels registering error on %s: %v", mInfo.info.Name, err) + e.errorsCounter.WithLabelValues("LabelsRegisteringError", mInfo.info.Name, "").Inc() + continue + } + e.metricsProcessed.Inc() + } +} + +func (m *MetricsCommonStruct) prepareMetric(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mv interface{}) (map[string]string, float64, string) { + val := m.extractGenericValue(flow, info) + if val == nil { + return nil, 0, "" + } + floatVal, err := utils.ConvertToFloat64(val) + if err != nil { + m.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() + return nil, 0, "" + } + if info.ValueScale != 0 { + floatVal = floatVal / info.ValueScale + } + + entryLabels, key := extractLabelsAndKey(flow, &info.MetricsItem) + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + cacheEntry := mci.GetChacheEntry(entryLabels, mv) + _, ok := m.mCache.UpdateCacheEntry(key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, 0, "" + } + return entryLabels, floatVal, key +} + +func (m *MetricsCommonStruct) prepareAggHisto(mci MetricsCommonInterface, flow config.GenericMap, info *MetricInfo, mc interface{}) (map[string]string, []float64) { + val := m.extractGenericValue(flow, info) + if val == nil { + return nil, nil + } + values, ok := val.([]float64) + if !ok { + m.errorsCounter.WithLabelValues("HistoValueConversionError", info.Name, info.ValueKey).Inc() + return nil, nil + } + + entryLabels, key := extractLabelsAndKey(flow, &info.MetricsItem) + // Update entry for expiry mechanism (the entry itself is its own cleanup function) + cacheEntry := mci.GetChacheEntry(entryLabels, mc) + _, ok = m.mCache.UpdateCacheEntry(key, cacheEntry) + if !ok { + m.metricsDropped.Inc() + return nil, nil + } + return entryLabels, values +} + +func (m *MetricsCommonStruct) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} { + for _, pred := range info.FilterPredicates { + if !pred(flow) { + return nil + } + } + if info.ValueKey == "" { + // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 + return 1 + } + val, found := flow[info.ValueKey] + if !found { + m.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() + return nil + } + return val +} + +func extractLabelsAndKey(flow config.GenericMap, info *api.MetricsItem) (map[string]string, string) { + entryLabels := make(map[string]string, len(info.Labels)) + key := strings.Builder{} + key.WriteString(info.Name) + key.WriteRune('|') + for _, t := range info.Labels { + entryLabels[t] = "" + if v, ok := flow[t]; ok { + entryLabels[t] = fmt.Sprintf("%v", v) + } + key.WriteString(entryLabels[t]) + key.WriteRune('|') + } + return entryLabels, key.String() +} + +func (m *MetricsCommonStruct) cleanupExpiredEntriesLoop(callback putils.CacheCallback) { + ticker := time.NewTicker(m.expiryTime) + for { + select { + case <-m.exitChan: + log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") + return + case <-ticker.C: + m.mCache.CleanupExpiredEntries(m.expiryTime, callback) + } + } +} + +func NewMetricsCommonStruct(opMetrics *operational.Metrics, maxCacheEntries int, name string, expiryTime api.Duration, callback putils.CacheCallback) *MetricsCommonStruct { + mChacheLenMetric := opMetrics.NewGauge(&mChacheLen, name) + m := &MetricsCommonStruct{ + mCache: putils.NewTimedCache(maxCacheEntries, mChacheLenMetric), + mChacheLenMetric: mChacheLenMetric, + metricsProcessed: opMetrics.NewCounter(&metricsProcessed, name), + metricsDropped: opMetrics.NewCounter(&metricsDropped, name), + errorsCounter: opMetrics.NewCounterVec(&encodePromErrors), + expiryTime: expiryTime.Duration, + exitChan: putils.ExitChannel(), + } + go m.cleanupExpiredEntriesLoop(callback) + return m +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go index 5d882ebef..1864f3287 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go @@ -50,10 +50,10 @@ type fakeOltpLoggerProvider struct { type fakeOltpLogger struct { } -var receivedData []logs.LogRecord +var otlpReceivedData []logs.LogRecord func (f *fakeOltpLogger) Emit(msg logs.LogRecord) { - receivedData = append(receivedData, msg) + otlpReceivedData = append(otlpReceivedData, msg) } func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger { @@ -61,7 +61,7 @@ func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOptio } func initNewEncodeOtlpLogs(t *testing.T) encode.Encoder { - receivedData = []logs.LogRecord{} + otlpReceivedData = []logs.LogRecord{} v, cfg := test.InitConfig(t, testOtlpConfig) require.NotNil(t, v) @@ -88,14 +88,14 @@ func Test_EncodeOtlpLogs(t *testing.T) { newEncode.Encode(entry1) newEncode.Encode(entry2) // verify contents of the output - require.Len(t, receivedData, 2) + require.Len(t, otlpReceivedData, 2) expected1, _ := json.Marshal(entry1) expected2, _ := json.Marshal(entry2) - require.Equal(t, string(expected1), *receivedData[0].Body()) - require.Equal(t, string(expected2), *receivedData[1].Body()) + require.Equal(t, string(expected1), *otlpReceivedData[0].Body()) + require.Equal(t, string(expected2), *otlpReceivedData[1].Body()) } -func Test_TLSConfigEmpty(t *testing.T) { +func Test_EncodeOtlpTLSConfigEmpty(t *testing.T) { cfg := config.StageParam{ Encode: &config.Encode{ OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ @@ -111,7 +111,7 @@ func Test_TLSConfigEmpty(t *testing.T) { require.NotNil(t, newEncode) } -func Test_TLSConfigNotEmpty(t *testing.T) { +func Test_EncodeOtlpTLSConfigNotEmpty(t *testing.T) { cfg := config.StageParam{ Encode: &config.Encode{ OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{ @@ -193,9 +193,5 @@ func Test_EncodeOtlpMetrics(t *testing.T) { newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg) require.NoError(t, err) require.NotNil(t, newEncode) - em := newEncode.(*EncodeOtlpMetrics) - require.Equal(t, 2, len(em.counters)) - require.Equal(t, 1, len(em.gauges)) - require.Equal(t, "metric3", em.counters[1].info.Name) - require.Equal(t, []string{"label21", "label22"}, em.gauges[0].info.Labels) + // TODO: add more tests } diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go index 739a7d4e2..c645bd53d 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go @@ -25,9 +25,6 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" - putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/netobserv/flowlogs-pipeline/pkg/utils" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -36,116 +33,60 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) -// TODO: Refactor the code that is common with encode_prom -const ( - defaultExpiryTime = time.Duration(2 * time.Minute) - flpMeterName = "flp_meter" -) - -type counterInfo struct { - counter *metric.Float64Counter - info *encode.MetricInfo -} - -type gaugeInfo struct { - gauge *metric.Float64ObservableGauge - info *encode.MetricInfo - obs Float64Gauge -} - -// TBD: Handle histograms -/* -type histoInfo struct { - histo *metric.Float64Histogram - info *encode.MetricInfo -} -*/ +const defaultExpiryTime = time.Duration(2 * time.Minute) +const flpMeterName = "flp_meter" type EncodeOtlpMetrics struct { - cfg api.EncodeOtlpMetrics - ctx context.Context - res *resource.Resource - mp *sdkmetric.MeterProvider - counters []counterInfo - gauges []gaugeInfo - //histos []histoInfo - //aggHistos []histoInfo - expiryTime time.Duration - mCache *putils.TimedCache - exitChan <-chan struct{} - meter metric.Meter - metricsProcessed prometheus.Counter - metricsDropped prometheus.Counter - errorsCounter *prometheus.CounterVec + cfg api.EncodeOtlpMetrics + ctx context.Context + res *resource.Resource + mp *sdkmetric.MeterProvider + meter metric.Meter + metricCommon *encode.MetricsCommonStruct } // Encode encodes a metric to be exported func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) { log.Tracef("entering EncodeOtlpMetrics. entry = %v", metricRecord) + e.metricCommon.MetricCommonEncode(e, metricRecord) +} - // Process counters - for _, mInfo := range e.counters { - labels, value, _ := e.prepareMetric(metricRecord, mInfo.info) - if labels == nil { - continue - } - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - (*mInfo.counter).Add(e.ctx, value, metric.WithAttributes(attributes...)) - e.metricsProcessed.Inc() - } +func (e *EncodeOtlpMetrics) ProcessCounter(m interface{}, labels map[string]string, value float64) error { + counter := m.(metric.Float64Counter) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + counter.Add(e.ctx, value, metric.WithAttributes(attributes...)) + return nil +} - // Process gauges - for _, mInfo := range e.gauges { - labels, value, key := e.prepareMetric(metricRecord, mInfo.info) - if labels == nil { - continue - } - // set attributes using the labels - attributes := obtainAttributesFromLabels(labels) - mInfo.obs.Set(key, value, attributes) - e.metricsProcessed.Inc() - } - // TBD: Process histograms +func (e *EncodeOtlpMetrics) ProcessGauge(m interface{}, labels map[string]string, value float64, key string) error { + obs := m.(Float64Gauge) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + obs.Set(key, value, attributes) + return nil } -func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *encode.MetricInfo) (map[string]string, float64, string) { - val := e.extractGenericValue(flow, info) - if val == nil { - return nil, 0, "" - } - floatVal, err := utils.ConvertToFloat64(val) - if err != nil { - e.errorsCounter.WithLabelValues("ValueConversionError", info.Name, info.ValueKey).Inc() - return nil, 0, "" - } +func (e *EncodeOtlpMetrics) ProcessHist(m interface{}, labels map[string]string, value float64) error { + histo := m.(metric.Float64Histogram) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + histo.Record(e.ctx, value, metric.WithAttributes(attributes...)) + return nil +} - entryLabels, key := encode.ExtractLabelsAndKey(flow, &info.MetricsItem) - // Update entry for expiry mechanism (the entry itself is its own cleanup function) - _, ok := e.mCache.UpdateCacheEntry(key, entryLabels) - if !ok { - e.metricsDropped.Inc() - return nil, 0, "" +func (e *EncodeOtlpMetrics) ProcessAggHist(m interface{}, labels map[string]string, values []float64) error { + histo := m.(metric.Float64Histogram) + // set attributes using the labels + attributes := obtainAttributesFromLabels(labels) + for _, v := range values { + histo.Record(e.ctx, v, metric.WithAttributes(attributes...)) } - return entryLabels, floatVal, key + return nil } -func (e *EncodeOtlpMetrics) extractGenericValue(flow config.GenericMap, info *encode.MetricInfo) interface{} { - for _, pred := range info.FilterPredicates { - if !pred(flow) { - return nil - } - } - if info.ValueKey == "" { - // No value key means it's a records / flows counter (1 flow = 1 increment), so just return 1 - return 1 - } - val, found := flow[info.ValueKey] - if !found { - e.errorsCounter.WithLabelValues("RecordKeyMissing", info.Name, info.ValueKey).Inc() - return nil - } - return val +func (e *EncodeOtlpMetrics) GetChacheEntry(entryLabels map[string]string, m interface{}) interface{} { + return entryLabels } func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) { @@ -173,8 +114,17 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar } meterFactory := otel.Meter(flpMeterName) - counters := []counterInfo{} - gauges := []gaugeInfo{} + + w := &EncodeOtlpMetrics{ + cfg: cfg, + ctx: ctx, + res: res, + mp: mp, + meter: meterFactory, + } + + metricCommon := encode.NewMetricsCommonStruct(opMetrics, 0, params.Name, expiryTime, nil) + w.metricCommon = metricCommon for _, mCfg := range cfg.Metrics { fullMetricName := cfg.Prefix + mCfg.Name @@ -189,10 +139,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Errorf("error during counter creation: %v", err) return nil, err } - counters = append(counters, counterInfo{ - counter: &counter, - info: mInfo, - }) + metricCommon.AddCounter(counter, mInfo) case api.MetricEncodeOperationName("Gauge"): // at implementation time, only asynchronous gauges are supported by otel in golang obs := Float64Gauge{observations: make(map[string]Float64GaugeEntry)} @@ -204,56 +151,31 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar log.Errorf("error during gauge creation: %v", err) return nil, err } - gInfo := gaugeInfo{ - info: mInfo, - obs: obs, - gauge: &gauge, + metricCommon.AddGauge(gauge, mInfo) + case api.MetricEncodeOperationName("Histogram"): + var histo metric.Float64Histogram + if len(mCfg.Buckets) == 0 { + histo, err = meter.Float64Histogram(fullMetricName) + } else { + histo, err = meter.Float64Histogram(fullMetricName, + metric.WithExplicitBucketBoundaries(mCfg.Buckets...), + ) + + } + if err != nil { + log.Errorf("error during histogram creation: %v", err) + return nil, err } - gauges = append(gauges, gInfo) - // TBD: handle histograms + metricCommon.AddHist(histo, mInfo) case "default": log.Errorf("invalid metric type = %v, skipping", mCfg.Type) continue } } - w := &EncodeOtlpMetrics{ - cfg: cfg, - ctx: ctx, - res: res, - mp: mp, - meter: meterFactory, - counters: counters, - gauges: gauges, - expiryTime: expiryTime.Duration, - mCache: putils.NewTimedCache(0, nil), - exitChan: putils.ExitChannel(), - metricsProcessed: opMetrics.NewCounter(&encode.MetricsProcessed, params.Name), - metricsDropped: opMetrics.NewCounter(&encode.MetricsDropped, params.Name), - errorsCounter: opMetrics.NewCounterVec(&encode.EncodePromErrors), - } - go w.cleanupExpiredEntriesLoop() return w, nil } -// Cleanup - callback function from lru cleanup -func (e *EncodeOtlpMetrics) Cleanup(cleanupFunc interface{}) { - // nothing more to do -} - -func (e *EncodeOtlpMetrics) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(e.expiryTime) - for { - select { - case <-e.exitChan: - log.Debugf("exiting cleanupExpiredEntriesLoop because of signal") - return - case <-ticker.C: - e.mCache.CleanupExpiredEntries(e.expiryTime, e.Cleanup) - } - } -} - // At present, golang only supports asynchronous gauge, so we have some function here to support this type Float64GaugeEntry struct { diff --git a/pkg/pipeline/encode/prom_cache_test.go b/pkg/pipeline/encode/prom_cache_test.go index bab6d71e2..649b310e4 100644 --- a/pkg/pipeline/encode/prom_cache_test.go +++ b/pkg/pipeline/encode/prom_cache_test.go @@ -113,12 +113,12 @@ func Test_Prom_Cache1(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 10, promEncode.mCache.GetCacheLen()) + require.Equal(t, 10, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 30, promEncode.mCache.GetCacheLen()) + require.Equal(t, 30, promEncode.metricCommon.mCache.GetCacheLen()) } func Test_Prom_Cache2(t *testing.T) { @@ -133,12 +133,12 @@ func Test_Prom_Cache2(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 20, promEncode.mCache.GetCacheLen()) + require.Equal(t, 20, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 30, promEncode.mCache.GetCacheLen()) + require.Equal(t, 30, promEncode.metricCommon.mCache.GetCacheLen()) } func Test_Prom_Cache3(t *testing.T) { @@ -153,10 +153,10 @@ func Test_Prom_Cache3(t *testing.T) { entries = utils.GenerateConnectionFlowEntries(10) require.Equal(t, 10, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 20, promEncode.mCache.GetCacheLen()) + require.Equal(t, 20, promEncode.metricCommon.mCache.GetCacheLen()) entries = utils.GenerateConnectionFlowEntries(40) require.Equal(t, 40, len(entries)) encodeEntries(promEncode, entries) - require.Equal(t, 80, promEncode.mCache.GetCacheLen()) + require.Equal(t, 80, promEncode.metricCommon.mCache.GetCacheLen()) }